mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-26 13:31:00 +00:00
Add a plugin to read data from Amazon S3
Moving from https://github.com/ford-prefect/gst-plugin-s3/ to have these plugins in a central location.
This commit is contained in:
parent
c46ec64b03
commit
a7d24506c2
6 changed files with 740 additions and 0 deletions
|
@ -12,6 +12,7 @@ members = [
|
||||||
"gst-plugin-sodium",
|
"gst-plugin-sodium",
|
||||||
"gst-plugin-cdg",
|
"gst-plugin-cdg",
|
||||||
"gst-plugin-rav1e",
|
"gst-plugin-rav1e",
|
||||||
|
"gst-plugin-s3",
|
||||||
]
|
]
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
|
|
23
gst-plugin-s3/Cargo.toml
Normal file
23
gst-plugin-s3/Cargo.toml
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
[package]
|
||||||
|
name = "gst-plugin-s3"
|
||||||
|
version = "0.5.0"
|
||||||
|
authors = ["Arun Raghavan <arun@arunraghavan.net>"]
|
||||||
|
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugin-rs"
|
||||||
|
license = "MIT/Apache-2.0"
|
||||||
|
edition = "2018"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
bytes = "0.4"
|
||||||
|
futures = "0.1"
|
||||||
|
glib = { git = "https://github.com/gtk-rs/glib" }
|
||||||
|
gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing", "v1_12"] }
|
||||||
|
gstreamer-base = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["subclassing", "v1_12"] }
|
||||||
|
rusoto_core = "0.39.0"
|
||||||
|
rusoto_s3 = "0.39.0"
|
||||||
|
url = "1.7"
|
||||||
|
tokio = "0.1"
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
name = "gsts3"
|
||||||
|
crate-type = ["cdylib", "rlib"]
|
||||||
|
path = "src/lib.rs"
|
37
gst-plugin-s3/README.md
Normal file
37
gst-plugin-s3/README.md
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
# gst-plugin-s3
|
||||||
|
|
||||||
|
This is a [GStreamer](https://gstreamer.freedesktop.org/) plugin to interact
|
||||||
|
with the [Amazon Simple Storage Service (S3)](https://aws.amazon.com/s3/).
|
||||||
|
|
||||||
|
Currently, a simple source element exists. The eventual plan is to also add a
|
||||||
|
sink, to allow writing out objects directly to S3.
|
||||||
|
|
||||||
|
## AWS Credentials
|
||||||
|
|
||||||
|
AWS credentials are picked up using the mechanism that
|
||||||
|
[rusoto's ChainProvider](http://rusoto.github.io/rusoto/rusoto/struct.ChainProvider.html)
|
||||||
|
uses. At the moment, that is:
|
||||||
|
|
||||||
|
1. Environment variables: `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
|
||||||
|
2. AWS credentials file. Usually located at ~/.aws/credentials.
|
||||||
|
3. IAM instance profile. Will only work if running on an EC2 instance with an instance profile/role.
|
||||||
|
|
||||||
|
An example credentials file might look like:
|
||||||
|
|
||||||
|
```ini
|
||||||
|
[default]
|
||||||
|
aws_access_key_id = ...
|
||||||
|
aws_secret_access_key = ...
|
||||||
|
```
|
||||||
|
|
||||||
|
## s3src
|
||||||
|
|
||||||
|
Reads from a given S3 (region, bucket, object, version?) tuple. The version may
|
||||||
|
be omitted, in which case the default behaviour of fetching the latest version
|
||||||
|
applies.
|
||||||
|
|
||||||
|
```
|
||||||
|
$ gst-launch-1.0 \
|
||||||
|
s3src uri=s3://ap-south-1/my-bucket/my-object-key/which-can-have-slashes?version=my-optional-version !
|
||||||
|
filesink name=my-object.out
|
||||||
|
```
|
34
gst-plugin-s3/src/lib.rs
Normal file
34
gst-plugin-s3/src/lib.rs
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||||
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||||
|
// option. This file may not be copied, modified, or distributed
|
||||||
|
// except according to those terms.
|
||||||
|
|
||||||
|
#![crate_type = "cdylib"]
|
||||||
|
|
||||||
|
#[macro_use]
|
||||||
|
extern crate glib;
|
||||||
|
#[macro_use]
|
||||||
|
extern crate gstreamer as gst;
|
||||||
|
extern crate gstreamer_base as gst_base;
|
||||||
|
|
||||||
|
mod s3src;
|
||||||
|
mod s3url;
|
||||||
|
|
||||||
|
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||||
|
s3src::register(plugin)
|
||||||
|
}
|
||||||
|
|
||||||
|
gst_plugin_define!(
|
||||||
|
"s3src",
|
||||||
|
"Amazon S3 Plugin",
|
||||||
|
plugin_init,
|
||||||
|
"1.0",
|
||||||
|
"MIT/X11",
|
||||||
|
"s3",
|
||||||
|
"s3",
|
||||||
|
"https://github.com/ford-prefect/gst-plugin-s3",
|
||||||
|
"2017-04-17"
|
||||||
|
);
|
474
gst-plugin-s3/src/s3src.rs
Normal file
474
gst-plugin-s3/src/s3src.rs
Normal file
|
@ -0,0 +1,474 @@
|
||||||
|
// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||||
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||||
|
// option. This file may not be copied, modified, or distributed
|
||||||
|
// except according to those terms.
|
||||||
|
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::sync::oneshot;
|
||||||
|
use futures::{Future, Stream};
|
||||||
|
use rusoto_s3::*;
|
||||||
|
use tokio::runtime;
|
||||||
|
|
||||||
|
use glib::prelude::*;
|
||||||
|
use glib::subclass;
|
||||||
|
use glib::subclass::prelude::*;
|
||||||
|
|
||||||
|
use gst;
|
||||||
|
use gst::subclass::prelude::*;
|
||||||
|
|
||||||
|
use gst_base;
|
||||||
|
use gst_base::prelude::*;
|
||||||
|
use gst_base::subclass::prelude::*;
|
||||||
|
|
||||||
|
use crate::s3url::*;
|
||||||
|
|
||||||
|
enum StreamingState {
|
||||||
|
Stopped,
|
||||||
|
Started {
|
||||||
|
url: GstS3Url,
|
||||||
|
client: S3Client,
|
||||||
|
size: u64,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct S3Src {
|
||||||
|
url: Mutex<Option<GstS3Url>>,
|
||||||
|
state: Mutex<StreamingState>,
|
||||||
|
cat: gst::DebugCategory,
|
||||||
|
runtime: runtime::Runtime,
|
||||||
|
canceller: Mutex<Option<oneshot::Sender<Bytes>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
static PROPERTIES: [subclass::Property; 1] = [subclass::Property("uri", |name| {
|
||||||
|
glib::ParamSpec::string(
|
||||||
|
name,
|
||||||
|
"URI",
|
||||||
|
"The S3 object URI",
|
||||||
|
None,
|
||||||
|
glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
|
||||||
|
)
|
||||||
|
})];
|
||||||
|
|
||||||
|
impl S3Src {
|
||||||
|
fn cancel(&self) {
|
||||||
|
let mut canceller = self.canceller.lock().unwrap();
|
||||||
|
|
||||||
|
if let Some(_) = canceller.take() {
|
||||||
|
/* We don't do anything, the Sender will be dropped, and that will cause the
|
||||||
|
* Receiver to be cancelled */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn wait<F>(&self, future: F) -> Result<F::Item, Option<gst::ErrorMessage>>
|
||||||
|
where
|
||||||
|
F: Send + Future<Error = gst::ErrorMessage> + 'static,
|
||||||
|
F::Item: Send,
|
||||||
|
{
|
||||||
|
let mut canceller = self.canceller.lock().unwrap();
|
||||||
|
let (sender, receiver) = oneshot::channel::<Bytes>();
|
||||||
|
|
||||||
|
canceller.replace(sender);
|
||||||
|
|
||||||
|
let unlock_error = gst_error_msg!(gst::ResourceError::Busy, ["unlock"]);
|
||||||
|
|
||||||
|
let res = oneshot::spawn(future, &self.runtime.executor())
|
||||||
|
.select(receiver.then(|_| Err(unlock_error.clone())))
|
||||||
|
.wait()
|
||||||
|
.map(|v| v.0)
|
||||||
|
.map_err(|err| {
|
||||||
|
if err.0 == unlock_error {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(err.0)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
/* Clear out the canceller */
|
||||||
|
*canceller = None;
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connect(self: &S3Src, url: &GstS3Url) -> Result<S3Client, gst::ErrorMessage> {
|
||||||
|
Ok(S3Client::new(url.region.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_uri(
|
||||||
|
self: &S3Src,
|
||||||
|
_: &gst_base::BaseSrc,
|
||||||
|
url_str: Option<String>,
|
||||||
|
) -> Result<(), glib::Error> {
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
if let StreamingState::Started { .. } = *state {
|
||||||
|
return Err(gst::Error::new(
|
||||||
|
gst::URIError::BadState,
|
||||||
|
"Cannot set URI on a started s3src",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut url = self.url.lock().unwrap();
|
||||||
|
|
||||||
|
match url_str {
|
||||||
|
Some(s) => match parse_s3_url(&s) {
|
||||||
|
Ok(s3url) => {
|
||||||
|
*url = Some(s3url);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(_) => Err(gst::Error::new(
|
||||||
|
gst::URIError::BadUri,
|
||||||
|
"Could not parse URI",
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
*url = None;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn head(
|
||||||
|
self: &S3Src,
|
||||||
|
src: &gst_base::BaseSrc,
|
||||||
|
client: &S3Client,
|
||||||
|
url: &GstS3Url,
|
||||||
|
) -> Result<u64, gst::ErrorMessage> {
|
||||||
|
let request = HeadObjectRequest {
|
||||||
|
bucket: url.bucket.clone(),
|
||||||
|
key: url.object.clone(),
|
||||||
|
version_id: url.version.clone(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = client.head_object(request);
|
||||||
|
|
||||||
|
let output = self
|
||||||
|
.wait(response.map_err(|err| {
|
||||||
|
gst_error_msg!(
|
||||||
|
gst::ResourceError::NotFound,
|
||||||
|
["Failed to HEAD object: {}", err]
|
||||||
|
)
|
||||||
|
}))
|
||||||
|
.map_err(|err| {
|
||||||
|
err.unwrap_or(gst_error_msg!(
|
||||||
|
gst::LibraryError::Failed,
|
||||||
|
["Interrupted during start"]
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if let Some(size) = output.content_length {
|
||||||
|
gst_info!(
|
||||||
|
self.cat,
|
||||||
|
obj: src,
|
||||||
|
"HEAD success, content length = {}",
|
||||||
|
size
|
||||||
|
);
|
||||||
|
Ok(size as u64)
|
||||||
|
} else {
|
||||||
|
Err(gst_error_msg!(
|
||||||
|
gst::ResourceError::Read,
|
||||||
|
["Failed to get content length"]
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns the bytes, Some(error) if one occured, or a None error if interrupted */
|
||||||
|
fn get(
|
||||||
|
self: &S3Src,
|
||||||
|
src: &gst_base::BaseSrc,
|
||||||
|
offset: u64,
|
||||||
|
length: u64,
|
||||||
|
) -> Result<Bytes, Option<gst::ErrorMessage>> {
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
let (url, client) = match *state {
|
||||||
|
StreamingState::Started {
|
||||||
|
ref url,
|
||||||
|
ref client,
|
||||||
|
..
|
||||||
|
} => (url, client),
|
||||||
|
StreamingState::Stopped => {
|
||||||
|
return Err(Some(gst_error_msg!(
|
||||||
|
gst::LibraryError::Failed,
|
||||||
|
["Cannot GET before start()"]
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let request = GetObjectRequest {
|
||||||
|
bucket: url.bucket.clone(),
|
||||||
|
key: url.object.clone(),
|
||||||
|
range: Some(format!("bytes={}-{}", offset, offset + length - 1)),
|
||||||
|
version_id: url.version.clone(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
gst_debug!(
|
||||||
|
self.cat,
|
||||||
|
obj: src,
|
||||||
|
"Requesting range: {}-{}",
|
||||||
|
offset,
|
||||||
|
offset + length - 1
|
||||||
|
);
|
||||||
|
|
||||||
|
let response = client.get_object(request);
|
||||||
|
|
||||||
|
/* Drop the state lock now that we're done with it and need the next part to be
|
||||||
|
* interruptible */
|
||||||
|
drop(state);
|
||||||
|
|
||||||
|
let output = self.wait(response.map_err(|err| {
|
||||||
|
gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err])
|
||||||
|
}))?;
|
||||||
|
|
||||||
|
gst_debug!(
|
||||||
|
self.cat,
|
||||||
|
obj: src,
|
||||||
|
"Read {} bytes",
|
||||||
|
output.content_length.unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
|
self.wait(
|
||||||
|
output.body.unwrap().concat2().map_err(|err| {
|
||||||
|
gst_error_msg!(gst::ResourceError::Read, ["Could not read: {}", err])
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ObjectSubclass for S3Src {
|
||||||
|
const NAME: &'static str = "S3Src";
|
||||||
|
type ParentType = gst_base::BaseSrc;
|
||||||
|
type Instance = gst::subclass::ElementInstanceStruct<Self>;
|
||||||
|
type Class = subclass::simple::ClassStruct<Self>;
|
||||||
|
|
||||||
|
glib_object_subclass!();
|
||||||
|
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
url: Mutex::new(None),
|
||||||
|
state: Mutex::new(StreamingState::Stopped),
|
||||||
|
cat: gst::DebugCategory::new(
|
||||||
|
"s3src",
|
||||||
|
gst::DebugColorFlags::empty(),
|
||||||
|
Some("Amazon S3 Source"),
|
||||||
|
),
|
||||||
|
runtime: runtime::Builder::new()
|
||||||
|
.core_threads(1)
|
||||||
|
.name_prefix("gst-s3-tokio")
|
||||||
|
.build()
|
||||||
|
.unwrap(),
|
||||||
|
canceller: Mutex::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn type_init(typ: &mut subclass::InitializingType<Self>) {
|
||||||
|
typ.add_interface::<gst::URIHandler>();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
|
||||||
|
klass.set_metadata(
|
||||||
|
"Amazon S3 source",
|
||||||
|
"Source/Network",
|
||||||
|
"Reads an object from Amazon S3",
|
||||||
|
"Arun Raghavan <arun@arunraghavan.net>",
|
||||||
|
);
|
||||||
|
|
||||||
|
let caps = gst::Caps::new_any();
|
||||||
|
let src_pad_template = gst::PadTemplate::new(
|
||||||
|
"src",
|
||||||
|
gst::PadDirection::Src,
|
||||||
|
gst::PadPresence::Always,
|
||||||
|
&caps,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
klass.add_pad_template(src_pad_template);
|
||||||
|
|
||||||
|
klass.install_properties(&PROPERTIES);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ObjectImpl for S3Src {
|
||||||
|
glib_object_impl!();
|
||||||
|
|
||||||
|
fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
|
||||||
|
let prop = &PROPERTIES[id as usize];
|
||||||
|
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
|
||||||
|
|
||||||
|
match *prop {
|
||||||
|
subclass::Property("uri", ..) => {
|
||||||
|
self.set_uri(basesrc, value.get()).unwrap_or_else(|err| {
|
||||||
|
gst_error!(self.cat, obj: basesrc, "Could not set URI: {}", err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_property(&self, _: &glib::Object, id: usize) -> Result<glib::Value, ()> {
|
||||||
|
let prop = &PROPERTIES[id as usize];
|
||||||
|
|
||||||
|
match *prop {
|
||||||
|
subclass::Property("uri", ..) => {
|
||||||
|
let url = match *self.url.lock().unwrap() {
|
||||||
|
Some(ref url) => url.to_string(),
|
||||||
|
None => "".to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(url.to_value())
|
||||||
|
}
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn constructed(&self, obj: &glib::Object) {
|
||||||
|
self.parent_constructed(obj);
|
||||||
|
|
||||||
|
let basesrc = obj.downcast_ref::<gst_base::BaseSrc>().unwrap();
|
||||||
|
basesrc.set_format(gst::Format::Bytes);
|
||||||
|
/* Set a larger default blocksize to make read more efficient */
|
||||||
|
basesrc.set_blocksize(262144);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ElementImpl for S3Src {
|
||||||
|
// No overrides
|
||||||
|
}
|
||||||
|
|
||||||
|
impl URIHandlerImpl for S3Src {
|
||||||
|
fn get_uri(&self, _: &gst::URIHandler) -> Option<String> {
|
||||||
|
self.url.lock().unwrap().as_ref().map(|s| s.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_uri(&self, element: &gst::URIHandler, uri: Option<String>) -> Result<(), glib::Error> {
|
||||||
|
let basesrc = element.dynamic_cast_ref::<gst_base::BaseSrc>().unwrap();
|
||||||
|
self.set_uri(basesrc, uri)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_uri_type() -> gst::URIType {
|
||||||
|
gst::URIType::Src
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_protocols() -> Vec<String> {
|
||||||
|
vec!["s3".to_string()]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BaseSrcImpl for S3Src {
|
||||||
|
fn is_seekable(&self, _: &gst_base::BaseSrc) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_size(&self, _: &gst_base::BaseSrc) -> Option<u64> {
|
||||||
|
match *self.state.lock().unwrap() {
|
||||||
|
StreamingState::Stopped => None,
|
||||||
|
StreamingState::Started { size, .. } => Some(size),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start(&self, src: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
if let StreamingState::Started { .. } = *state {
|
||||||
|
unreachable!("S3Src is already started");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Drop the lock as self.head() needs it */
|
||||||
|
drop(state);
|
||||||
|
|
||||||
|
let s3url = match *self.url.lock().unwrap() {
|
||||||
|
Some(ref url) => url.clone(),
|
||||||
|
None => {
|
||||||
|
return Err(gst_error_msg!(
|
||||||
|
gst::ResourceError::Settings,
|
||||||
|
["Cannot start without a URL being set"]
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let s3client = self.connect(&s3url)?;
|
||||||
|
|
||||||
|
let size = self.head(src, &s3client, &s3url)?;
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
*state = StreamingState::Started {
|
||||||
|
url: s3url,
|
||||||
|
client: s3client,
|
||||||
|
size: size,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
if let StreamingState::Stopped = *state {
|
||||||
|
unreachable!("Cannot stop before start");
|
||||||
|
}
|
||||||
|
|
||||||
|
*state = StreamingState::Stopped;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query(&self, src: &gst_base::BaseSrc, query: &mut gst::QueryRef) -> bool {
|
||||||
|
match query.view_mut() {
|
||||||
|
gst::QueryView::Scheduling(ref mut q) => {
|
||||||
|
q.set(
|
||||||
|
gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED,
|
||||||
|
1,
|
||||||
|
-1,
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
q.add_scheduling_modes(&[gst::PadMode::Push, gst::PadMode::Pull]);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
|
||||||
|
BaseSrcImplExt::parent_query(self, src, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create(
|
||||||
|
&self,
|
||||||
|
src: &gst_base::BaseSrc,
|
||||||
|
offset: u64,
|
||||||
|
length: u32,
|
||||||
|
) -> Result<gst::Buffer, gst::FlowError> {
|
||||||
|
// FIXME: sanity check on offset and length
|
||||||
|
let data = self.get(src, offset, u64::from(length));
|
||||||
|
|
||||||
|
match data {
|
||||||
|
/* Got data */
|
||||||
|
Ok(bytes) => Ok(gst::Buffer::from_slice(bytes)),
|
||||||
|
/* Interrupted */
|
||||||
|
Err(None) => Err(gst::FlowError::Flushing),
|
||||||
|
/* Actual Error */
|
||||||
|
Err(Some(err)) => {
|
||||||
|
gst_error!(self.cat, obj: src, "Could not GET: {}", err);
|
||||||
|
Err(gst::FlowError::Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* FIXME: implement */
|
||||||
|
fn do_seek(&self, _: &gst_base::BaseSrc, _: &mut gst::Segment) -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unlock(&self, _: &gst_base::BaseSrc) -> Result<(), gst::ErrorMessage> {
|
||||||
|
self.cancel();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||||
|
gst::Element::register(Some(plugin), "s3src", 0, S3Src::get_type())
|
||||||
|
}
|
171
gst-plugin-s3/src/s3url.rs
Normal file
171
gst-plugin-s3/src/s3url.rs
Normal file
|
@ -0,0 +1,171 @@
|
||||||
|
// Copyright (C) 2017 Author: Arun Raghavan <arun@arunraghavan.net>
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||||
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||||
|
// option. This file may not be copied, modified, or distributed
|
||||||
|
// except according to those terms.
|
||||||
|
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use rusoto_core::Region;
|
||||||
|
use url::percent_encoding::{percent_decode, percent_encode, DEFAULT_ENCODE_SET};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct GstS3Url {
|
||||||
|
pub region: Region,
|
||||||
|
pub bucket: String,
|
||||||
|
pub object: String,
|
||||||
|
pub version: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToString for GstS3Url {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
format!(
|
||||||
|
"s3://{}/{}/{}{}",
|
||||||
|
self.region.name(),
|
||||||
|
self.bucket,
|
||||||
|
percent_encode(self.object.as_bytes(), DEFAULT_ENCODE_SET),
|
||||||
|
if self.version.is_some() {
|
||||||
|
format!("?version={}", self.version.clone().unwrap())
|
||||||
|
} else {
|
||||||
|
"".to_string()
|
||||||
|
}
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_s3_url(url_str: &str) -> Result<GstS3Url, String> {
|
||||||
|
let url = Url::parse(url_str).or_else(|err| Err(format!("Parse error: {}", err)))?;
|
||||||
|
|
||||||
|
if url.scheme() != "s3" {
|
||||||
|
return Err(format!("Unsupported URI '{}'", url.scheme()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if !url.has_host() {
|
||||||
|
return Err(format!("Invalid host in uri '{}'", url));
|
||||||
|
}
|
||||||
|
|
||||||
|
let host = url.host_str().unwrap();
|
||||||
|
let region = Region::from_str(host).or_else(|_| Err(format!("Invalid region '{}'", host)))?;
|
||||||
|
|
||||||
|
let mut path = url
|
||||||
|
.path_segments()
|
||||||
|
.ok_or_else(|| format!("Invalid uri '{}'", url))?;
|
||||||
|
|
||||||
|
let bucket = path.next().unwrap().to_string();
|
||||||
|
|
||||||
|
let o = path
|
||||||
|
.next()
|
||||||
|
.ok_or_else(|| format!("Invalid empty object/bucket '{}'", url))?;
|
||||||
|
|
||||||
|
let mut object = percent_decode(o.as_bytes())
|
||||||
|
.decode_utf8()
|
||||||
|
.unwrap()
|
||||||
|
.to_string();
|
||||||
|
if o.is_empty() {
|
||||||
|
return Err(format!("Invalid empty object/bucket '{}'", url));
|
||||||
|
}
|
||||||
|
|
||||||
|
object = path.fold(object, |o, p| format!("{}/{}", o, p));
|
||||||
|
|
||||||
|
let mut q = url.query_pairs();
|
||||||
|
let v = q.next();
|
||||||
|
let version;
|
||||||
|
|
||||||
|
match v {
|
||||||
|
Some((ref k, ref v)) if k == "version" => version = Some((*v).to_string()),
|
||||||
|
None => version = None,
|
||||||
|
Some(_) => return Err("Bad query, only 'version' is supported".to_owned()),
|
||||||
|
}
|
||||||
|
|
||||||
|
if q.next() != None {
|
||||||
|
return Err("Extra query terms, only 'version' is supported".to_owned());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(GstS3Url {
|
||||||
|
region: region,
|
||||||
|
bucket: bucket,
|
||||||
|
object: object,
|
||||||
|
version: version,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn cannot_be_base() {
|
||||||
|
assert!(parse_s3_url("data:something").is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn invalid_scheme() {
|
||||||
|
assert!(parse_s3_url("file:///dev/zero").is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bad_region() {
|
||||||
|
assert!(parse_s3_url("s3://atlantis-1/i-hope-we/dont-find-this").is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn no_bucket() {
|
||||||
|
assert!(parse_s3_url("s3://ap-south-1").is_err());
|
||||||
|
assert!(parse_s3_url("s3://ap-south-1/").is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn no_object() {
|
||||||
|
assert!(parse_s3_url("s3://ap-south-1/my-bucket").is_err());
|
||||||
|
assert!(parse_s3_url("s3://ap-south-1/my-bucket/").is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn valid_simple() {
|
||||||
|
assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object").is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn extraneous_query() {
|
||||||
|
assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object?foo=bar").is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn valid_version() {
|
||||||
|
assert!(parse_s3_url("s3://ap-south-1/my-bucket/my-object?version=one").is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn trailing_slash() {
|
||||||
|
// Slashes are valid at the end of the object key
|
||||||
|
assert_eq!(
|
||||||
|
parse_s3_url("s3://ap-south-1/my-bucket/my-object/")
|
||||||
|
.unwrap()
|
||||||
|
.object,
|
||||||
|
"my-object/"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn percent_encoding() {
|
||||||
|
assert_eq!(
|
||||||
|
parse_s3_url("s3://ap-south-1/my-bucket/my%20object")
|
||||||
|
.unwrap()
|
||||||
|
.object,
|
||||||
|
"my object"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn percent_decoding() {
|
||||||
|
assert_eq!(
|
||||||
|
parse_s3_url("s3://ap-south-1/my-bucket/my object")
|
||||||
|
.unwrap()
|
||||||
|
.to_string(),
|
||||||
|
"s3://ap-south-1/my-bucket/my%20object"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue