httpsrc: Port to new subclassing API

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/issues/50
This commit is contained in:
Sebastian Dröge 2018-12-30 12:03:32 +02:00 committed by Sebastian Dröge
parent 8ad7643ec3
commit 1e39927037
3 changed files with 310 additions and 128 deletions

View file

@ -8,11 +8,10 @@ license = "MIT/Apache-2.0"
[dependencies]
url = "1.1"
glib = { git = "https://github.com/gtk-rs/glib" }
gst-plugin = { path="../gst-plugin" }
gst-plugin-simple = { path="../gst-plugin-simple" }
reqwest = "0.9"
hyperx = "0.13"
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] }
gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing"] }
[lib]
name = "gstrshttp"

View file

@ -1,4 +1,4 @@
// Copyright (C) 2016-2017 Sebastian Dröge <sebastian@centricular.com>
// Copyright (C) 2016-2018 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
@ -12,19 +12,47 @@ use hyperx::header::{
};
use reqwest::{Client, Response};
use std::io::Read;
use std::sync::Mutex;
use std::u64;
use url::Url;
use gst_plugin::error::*;
use gst_plugin_simple::error::*;
use gst_plugin_simple::source::*;
use gst_plugin_simple::UriValidator;
use glib;
use glib::subclass;
use glib::subclass::prelude::*;
use gst;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_base;
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
const DEFAULT_LOCATION: Option<Url> = None;
#[derive(Debug, Clone)]
struct Settings {
location: Option<Url>,
}
impl Default for Settings {
fn default() -> Self {
Settings {
location: DEFAULT_LOCATION,
}
}
}
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("location", |name| {
glib::ParamSpec::string(
name,
"File Location",
"URL to read from",
None,
glib::ParamFlags::READWRITE,
)
})];
#[derive(Debug)]
enum StreamingState {
enum State {
Stopped,
Started {
uri: Url,
@ -37,37 +65,70 @@ enum StreamingState {
},
}
impl Default for State {
fn default() -> Self {
State::Stopped
}
}
#[derive(Debug)]
pub struct HttpSrc {
streaming_state: StreamingState,
cat: gst::DebugCategory,
client: Client,
settings: Mutex<Settings>,
state: Mutex<State>,
}
impl HttpSrc {
pub fn new(_src: &BaseSrc) -> HttpSrc {
HttpSrc {
streaming_state: StreamingState::Stopped,
cat: gst::DebugCategory::new(
"rshttpsrc",
gst::DebugColorFlags::empty(),
"Rust HTTP source",
),
client: Client::new(),
fn set_location(
&self,
_element: &gst_base::BaseSrc,
uri: Option<String>,
) -> Result<(), glib::Error> {
let state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
return Err(glib::Error::new(
gst::URIError::BadState,
"Changing the `location` property on a started `httpsrc` is not supported",
));
}
}
pub fn new_boxed(src: &BaseSrc) -> Box<SourceImpl> {
Box::new(HttpSrc::new(src))
let mut settings = self.settings.lock().unwrap();
let uri = match uri {
Some(uri) => uri,
None => {
settings.location = None;
return Ok(());
}
};
let uri = Url::parse(uri.as_str()).map_err(|err| {
glib::Error::new(
gst::URIError::BadUri,
format!("Failed to parse URI '{}': {:?}", uri, err,).as_str(),
)
})?;
if uri.scheme() != "http" && uri.scheme() != "https" {
return Err(glib::Error::new(
gst::URIError::UnsupportedProtocol,
format!("Unsupported URI scheme '{}'", uri.scheme(),).as_str(),
));
}
settings.location = Some(uri);
Ok(())
}
fn do_request(
&self,
src: &BaseSrc,
src: &gst_base::BaseSrc,
uri: Url,
start: u64,
stop: Option<u64>,
) -> Result<StreamingState, gst::ErrorMessage> {
) -> Result<State, gst::ErrorMessage> {
let cat = self.cat;
let req = self.client.get(uri.clone());
@ -134,7 +195,7 @@ impl HttpSrc {
gst_debug!(cat, obj: src, "Request successful: {:?}", response);
Ok(StreamingState::Started {
Ok(State::Started {
uri,
response,
seekable,
@ -146,140 +207,282 @@ impl HttpSrc {
}
}
fn validate_uri(uri: &Url) -> Result<(), UriError> {
if uri.scheme() != "http" && uri.scheme() != "https" {
return Err(UriError::new(
gst::URIError::UnsupportedProtocol,
format!("Unsupported URI '{}'", uri.as_str()),
));
impl ObjectImpl for HttpSrc {
glib_object_impl!();
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
let location = value.get::<String>();
let res = self.set_location(element, location);
if let Err(err) = res {
gst_error!(
self.cat,
obj: element,
"Failed to set property `location`: {:?}",
err
);
}
}
_ => unimplemented!(),
};
}
Ok(())
fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
let prop = &PROPERTIES[id];
match *prop {
subclass::Property("location", ..) => {
let settings = self.settings.lock().unwrap();
let location = settings.location.as_ref().map(Url::to_string);
Ok(location.to_value())
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &glib::Object) {
self.parent_constructed(obj);
let element = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
element.set_format(gst::Format::Bytes);
}
}
impl SourceImpl for HttpSrc {
fn uri_validator(&self) -> Box<UriValidator> {
Box::new(validate_uri)
}
impl ElementImpl for HttpSrc {}
fn is_seekable(&self, _src: &BaseSrc) -> bool {
match self.streaming_state {
StreamingState::Started { seekable, .. } => seekable,
impl BaseSrcImpl for HttpSrc {
fn is_seekable(&self, _src: &gst_base::BaseSrc) -> bool {
match *self.state.lock().unwrap() {
State::Started { seekable, .. } => seekable,
_ => false,
}
}
fn get_size(&self, _src: &BaseSrc) -> Option<u64> {
match self.streaming_state {
StreamingState::Started { size, .. } => size,
fn get_size(&self, _src: &gst_base::BaseSrc) -> Option<u64> {
match *self.state.lock().unwrap() {
State::Started { size, .. } => size,
_ => None,
}
}
fn start(&mut self, src: &BaseSrc, uri: Url) -> Result<(), gst::ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
self.streaming_state = try!(self.do_request(src, uri, 0, None));
fn start(&self, src: &gst_base::BaseSrc) -> bool {
let mut state = self.state.lock().unwrap();
Ok(())
*state = State::Stopped;
let uri = match self.settings.lock().unwrap().location {
Some(ref uri) => uri.clone(),
None => {
gst_element_error!(
src,
gst::CoreError::StateChange,
["Can't start without an URI"]
);
return false;
}
};
match self.do_request(src, uri, 0, None) {
Ok(s) => {
*state = s;
true
}
Err(err) => {
src.post_error_message(&err);
false
}
}
}
fn stop(&mut self, _src: &BaseSrc) -> Result<(), gst::ErrorMessage> {
self.streaming_state = StreamingState::Stopped;
fn stop(&self, _src: &gst_base::BaseSrc) -> bool {
*self.state.lock().unwrap() = State::Stopped;
Ok(())
true
}
fn seek(
&mut self,
src: &BaseSrc,
start: u64,
stop: Option<u64>,
) -> Result<(), gst::ErrorMessage> {
let (position, old_stop, uri) = match self.streaming_state {
StreamingState::Started {
fn do_seek(&self, src: &gst_base::BaseSrc, segment: &mut gst::Segment) -> bool {
let segment = segment.downcast_mut::<gst::format::Bytes>().unwrap();
let mut state = self.state.lock().unwrap();
let (position, old_stop, uri) = match *state {
State::Started {
position,
stop,
ref uri,
..
} => (position, stop, uri.clone()),
StreamingState::Stopped => {
return Err(gst_error_msg!(
gst::LibraryError::Failed,
["Not started yet"]
));
State::Stopped => {
gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
return false;
}
};
if position == start && old_stop == stop {
return Ok(());
let start = segment.get_start().expect("No start position given");
let stop = segment.get_stop();
if position == start && old_stop == stop.0 {
return true;
}
self.streaming_state = StreamingState::Stopped;
self.streaming_state = try!(self.do_request(src, uri, start, stop));
Ok(())
*state = State::Stopped;
match self.do_request(src, uri, start, stop.0) {
Ok(s) => {
*state = s;
true
}
Err(err) => {
src.post_error_message(&err);
false
}
}
}
fn fill(
&mut self,
src: &BaseSrc,
&self,
src: &gst_base::BaseSrc,
offset: u64,
_: u32,
buffer: &mut gst::BufferRef,
) -> Result<(), FlowError> {
let cat = self.cat;
) -> gst::FlowReturn {
let mut state = self.state.lock().unwrap();
let (response, position) = match self.streaming_state {
StreamingState::Started {
let (response, position) = match *state {
State::Started {
ref mut response,
ref mut position,
..
} => (response, position),
StreamingState::Stopped => {
return Err(FlowError::Error(gst_error_msg!(
gst::LibraryError::Failed,
["Not started yet"]
)));
State::Stopped => {
gst_element_error!(src, gst::LibraryError::Failed, ["Not started yet"]);
return gst::FlowReturn::Error;
}
};
if *position != offset {
return Err(FlowError::Error(gst_error_msg!(
gst_element_error!(
src,
gst::ResourceError::Seek,
["Got unexpected offset {}, expected {}", offset, position]
)));
);
return gst::FlowReturn::Error;
}
let size = {
let mut map = match buffer.map_writable() {
None => {
return Err(FlowError::Error(gst_error_msg!(
gst::LibraryError::Failed,
["Failed to map buffer"]
)));
gst_element_error!(src, gst::LibraryError::Failed, ["Failed to map buffer"]);
return gst::FlowReturn::Error;
}
Some(map) => map,
};
let data = map.as_mut_slice();
try!(response.read(data).or_else(|err| {
gst_error!(cat, obj: src, "Failed to read: {:?}", err);
Err(FlowError::Error(gst_error_msg!(
gst::ResourceError::Read,
["Failed to read at {}: {}", offset, err.to_string()]
)))
}))
match response.read(data) {
Ok(size) => size,
Err(err) => {
gst_error!(self.cat, obj: src, "Failed to read: {:?}", err);
gst_element_error!(
src,
gst::ResourceError::Read,
["Failed to read at {}: {}", offset, err.to_string()]
);
return gst::FlowReturn::Error;
}
}
};
if size == 0 {
return Err(FlowError::Eos);
return gst::FlowReturn::Eos;
}
*position += size as u64;
buffer.set_size(size);
Ok(())
gst::FlowReturn::Ok
}
}
impl URIHandlerImpl for HttpSrc {
fn get_uri(&self, _element: &gst::URIHandler) -> Option<String> {
let settings = self.settings.lock().unwrap();
settings.location.as_ref().map(Url::to_string)
}
fn set_uri(&self, element: &gst::URIHandler, uri: Option<String>) -> Result<(), glib::Error> {
let element = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
self.set_location(&element, uri)
}
fn get_uri_type() -> gst::URIType {
gst::URIType::Src
}
fn get_protocols() -> Vec<String> {
vec!["http".to_string(), "https".to_string()]
}
}
impl ObjectSubclass for HttpSrc {
const NAME: &'static str = "RsHttpSrc";
type ParentType = gst_base::BaseSrc;
type Instance = gst::subclass::ElementInstanceStruct<Self>;
type Class = subclass::simple::ClassStruct<Self>;
glib_object_subclass!();
fn new() -> Self {
Self {
cat: gst::DebugCategory::new(
"rshttpsrc",
gst::DebugColorFlags::empty(),
"Rust HTTP source",
),
client: Client::new(),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
}
}
fn type_init(type_: &mut subclass::InitializingType<Self>) {
type_.add_interface::<gst::URIHandler>();
}
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
klass.set_metadata(
"HTTP Source",
"Source/Network/HTTP",
"Read stream from an HTTP/HTTPS location",
"Sebastian Dröge <sebastian@centricular.com>",
);
let caps = gst::Caps::new_any();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
);
klass.add_pad_template(src_pad_template);
klass.install_properties(&PROPERTIES);
}
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(plugin, "rshttpsrc", 256 + 100, HttpSrc::get_type())
}

View file

@ -8,49 +8,29 @@
#![crate_type = "cdylib"]
#[macro_use]
extern crate glib;
#[macro_use]
extern crate gst_plugin;
extern crate gst_plugin_simple;
#[macro_use]
extern crate gstreamer as gst;
extern crate gstreamer_base as gst_base;
extern crate hyperx;
extern crate reqwest;
extern crate url;
use gst_plugin_simple::source::*;
mod httpsrc;
use httpsrc::HttpSrc;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
source_register(
plugin,
SourceInfo {
name: "rshttpsrc".into(),
long_name: "HTTP/HTTPS Source".into(),
description: "Reads HTTP/HTTPS streams".into(),
classification: "Source/File".into(),
author: "Sebastian Dröge <sebastian@centricular.com>".into(),
rank: 256 + 100,
create_instance: HttpSrc::new_boxed,
protocols: vec!["http".into(), "https".into()],
push_only: true,
},
);
Ok(())
httpsrc::register(plugin)
}
plugin_define!(
b"rshttp\0",
b"Rust HTTP Plugin\0",
gst_plugin_define!(
"rshttp",
"Rust HTTP Plugin",
plugin_init,
b"1.0\0",
b"MIT/X11\0",
b"rshttp\0",
b"rshttp\0",
b"https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs\0",
b"2016-12-08\0"
"1.0",
"MIT/X11",
"rshttp",
"rshttp",
"https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs",
"2016-12-08"
);