rshttpsrc: Making recieving async/cancellable

This commit is contained in:
ahamedsajeer.15 2019-06-25 01:41:11 +05:30
parent 7948b266c5
commit 7679becc66
3 changed files with 144 additions and 39 deletions

View file

@ -5,14 +5,18 @@ authors = ["Sebastian Dröge <sebastian@centricular.com>"]
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
license = "MIT/Apache-2.0" license = "MIT/Apache-2.0"
description = "Rust HTTP Plugin" description = "Rust HTTP Plugin"
edition = "2018"
[dependencies] [dependencies]
url = "1.1" url = "1.1"
glib = { git = "https://github.com/gtk-rs/glib" } glib = { git = "https://github.com/gtk-rs/glib" }
reqwest = "0.9" reqwest = "0.9"
futures = "0.1.23"
hyperx = "0.15" hyperx = "0.15"
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] } gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] }
gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] } gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] }
tokio = "0.1"
bytes = "0.4"
[lib] [lib]
name = "gstrshttp" name = "gstrshttp"

View file

@ -6,14 +6,18 @@
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
use bytes::Bytes;
use futures::sync::oneshot;
use futures::{Future, Stream};
use hyperx::header::{ use hyperx::header::{
AcceptRanges, ByteRangeSpec, ContentLength, ContentRange, ContentRangeSpec, Headers, Range, AcceptRanges, ByteRangeSpec, ContentLength, ContentRange, ContentRangeSpec, Headers, Range,
RangeUnit, RangeUnit,
}; };
use reqwest::{Client, Response}; use reqwest::r#async::{Client, Decoder};
use std::io::Read; use std::mem;
use std::sync::Mutex; use std::sync::Mutex;
use std::u64; use std::u64;
use tokio::runtime;
use url::Url; use url::Url;
use glib; use glib;
@ -56,7 +60,7 @@ enum State {
Stopped, Stopped,
Started { Started {
uri: Url, uri: Url,
response: Response, body: Option<Decoder>,
seekable: bool, seekable: bool,
position: u64, position: u64,
size: Option<u64>, size: Option<u64>,
@ -77,6 +81,8 @@ pub struct HttpSrc {
client: Client, client: Client,
settings: Mutex<Settings>, settings: Mutex<Settings>,
state: Mutex<State>, state: Mutex<State>,
runtime: runtime::Runtime,
canceller: Mutex<Option<oneshot::Sender<Bytes>>>,
} }
impl HttpSrc { impl HttpSrc {
@ -137,13 +143,25 @@ impl HttpSrc {
gst_debug!(cat, obj: src, "Doing new request {:?}", req); gst_debug!(cat, obj: src, "Doing new request {:?}", req);
let response = try!(req.send().or_else(|err| { let response_fut = req.send().and_then(|res| {
gst_error!(cat, obj: src, "Request failed: {:?}", err); // gst_debug!(cat, obj: src, "Response received: {:?}", res);
Err(gst_error_msg!( Ok(res)
gst::ResourceError::Read, });
["Failed to fetch {}: {}", uri, err.to_string()]
)) let uri_clone = uri.clone();
})); let mut response = self
.wait(response_fut.map_err(move |err| {
gst_error_msg!(
gst::ResourceError::Read,
["Failed to fetch {}: {:?}", uri_clone, err]
)
}))
.map_err(|err| {
err.unwrap_or(gst_error_msg!(
gst::LibraryError::Failed,
["Interrupted during start"]
))
})?;
if !response.status().is_success() { if !response.status().is_success() {
gst_error!(cat, obj: src, "Request status failed: {:?}", response); gst_error!(cat, obj: src, "Request status failed: {:?}", response);
@ -183,9 +201,11 @@ impl HttpSrc {
gst_debug!(cat, obj: src, "Request successful: {:?}", response); gst_debug!(cat, obj: src, "Request successful: {:?}", response);
let body = mem::replace(response.body_mut(), Decoder::empty());
Ok(State::Started { Ok(State::Started {
uri, uri,
response, body: Some(body),
seekable, seekable,
position: 0, position: 0,
size, size,
@ -193,6 +213,36 @@ impl HttpSrc {
stop, stop,
}) })
} }
fn wait<F>(&self, future: F) -> Result<F::Item, Option<gst::ErrorMessage>>
where
F: Send + Future<Error = gst::ErrorMessage> + 'static,
F::Item: Send,
{
let mut canceller = self.canceller.lock().unwrap();
let (sender, receiver) = oneshot::channel::<Bytes>();
canceller.replace(sender);
let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]);
let res = oneshot::spawn(future, &self.runtime.executor())
.select(receiver.then(|_| Err(unlock_error.clone())))
.wait()
.map(|v| v.0)
.map_err(|err| {
if err.0 == unlock_error {
None
} else {
Some(err.0)
}
});
/* Clear out the canceller */
*canceller = None;
res
}
} }
impl ObjectImpl for HttpSrc { impl ObjectImpl for HttpSrc {
@ -235,8 +285,8 @@ impl ObjectImpl for HttpSrc {
fn constructed(&self, obj: &glib::Object) { fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj); self.parent_constructed(obj);
let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap(); let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
element.set_automatic_eos(false);
element.set_format(gst::Format::Bytes); element.set_format(gst::Format::Bytes);
} }
} }
@ -324,21 +374,21 @@ impl BaseSrcImpl for HttpSrc {
} }
} }
fn fill( fn create(
&self, &self,
src: &gst_base::BaseSrc, src: &gst_base::BaseSrc,
offset: u64, offset: u64,
_: u32, _length: u32,
buffer: &mut gst::BufferRef, ) -> Result<gst::Buffer, gst::FlowError> {
) -> Result<gst::FlowSuccess, gst::FlowError> { let cat = self.cat;
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let (response, position) = match *state { let (body, position) = match *state {
State::Started { State::Started {
ref mut response, ref mut body,
ref mut position, ref mut position,
.. ..
} => (response, position), } => (body, position),
State::Stopped => { State::Stopped => {
gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]); gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
@ -356,36 +406,78 @@ impl BaseSrcImpl for HttpSrc {
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
let size = { let current_body = match body.take() {
let mut map = buffer.map_writable().ok_or_else(|| { Some(body) => body,
gst_element_error!(src, gst::LibraryError::Failed, ["Failed to map buffer"]); None => {
gst_error!(self.cat, obj: src, "Don't have a response body");
gst_element_error!(
src,
gst::ResourceError::Read,
["Don't have a response body"]
);
gst::FlowError::Error return Err(gst::FlowError::Error);
})?; }
};
let data = map.as_mut_slice(); drop(state);
let res = self.wait(current_body.into_future().map_err(|(err, _body)| {
gst_error_msg!(
gst::ResourceError::Read,
["Failed to read chunk: {:?}", err]
)
}));
let mut state = self.state.lock().unwrap();
let (body, position) = match *state {
State::Started {
ref mut body,
ref mut position,
..
} => (body, position),
State::Stopped => {
gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
return Err(gst::FlowError::Error);
}
};
match res {
Ok((Some(chunk), current_body)) => {
/* do something with the chunk and store the body again in the state */
gst_debug!(cat, obj: src, "Data Received {:?}", chunk);
let size = chunk.len();
assert_ne!(chunk.len(), 0);
*position += size as u64;
let buffer = gst::Buffer::from_slice(chunk);
*body = Some(current_body);
return Ok(buffer);
}
Ok((None, current_body)) => {
/* No further data, end of stream */
gst_debug!(cat, obj: src, "End of stream");
*body = Some(current_body);
// src.set_automatic_eos(false);
return Err(gst::FlowError::Eos);
}
Err(err) => {
/* error */
response.read(data).map_err(|err| {
gst_error!(self.cat, obj: src, "Failed to read: {:?}", err); gst_error!(self.cat, obj: src, "Failed to read: {:?}", err);
gst_element_error!( gst_element_error!(
src, src,
gst::ResourceError::Read, gst::ResourceError::Read,
["Failed to read at {}: {}", offset, err.to_string()] ["Failed to read at {}: {:?}", offset, err]
); );
gst::FlowError::Error return Err(gst::FlowError::Error);
})? }
};
if size == 0 {
return Err(gst::FlowError::Eos);
} }
*position += size as u64;
buffer.set_size(size);
Ok(gst::FlowSuccess::Ok)
} }
} }
@ -429,6 +521,12 @@ impl ObjectSubclass for HttpSrc {
client: Client::new(), client: Client::new(),
settings: Mutex::new(Default::default()), settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()), state: Mutex::new(Default::default()),
runtime: runtime::Builder::new()
.core_threads(1)
.name_prefix("gst-http-tokio")
.build()
.unwrap(),
canceller: Mutex::new(None),
} }
} }

View file

@ -12,9 +12,12 @@
extern crate glib; extern crate glib;
#[macro_use] #[macro_use]
extern crate gstreamer as gst; extern crate gstreamer as gst;
extern crate bytes;
extern crate futures;
extern crate gstreamer_base as gst_base; extern crate gstreamer_base as gst_base;
extern crate hyperx; extern crate hyperx;
extern crate reqwest; extern crate reqwest;
extern crate tokio;
extern crate url; extern crate url;
mod httpsrc; mod httpsrc;