mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-26 05:21:00 +00:00
aws: s3: Add a new awss3putobjectsink
When streaming small amounts of data, using awss3sink might not be a good idea, as we need to accumulate at least 5 MB of data for a multipart upload (or we flush on EOS). The alternative, while inefficient, is to do a complete PutObject of _all_ the data periodically so as to not lose data in case of a pipeline failure. This element makes a start on this idea by doing a PutObject for every buffer. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1337>
This commit is contained in:
parent
0a27b9e6d9
commit
a54b2dd39e
4 changed files with 718 additions and 45 deletions
|
@ -1,4 +1,6 @@
|
|||
// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
|
||||
// Copyright (C) 2023 Asymptotic Inc
|
||||
// Author: Arun Raghavan <arun@asymptotic.io>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||
|
@ -9,7 +11,8 @@
|
|||
use gst::glib;
|
||||
use gst::prelude::*;
|
||||
|
||||
mod imp;
|
||||
mod multipartsink;
|
||||
mod putobjectsink;
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
|
||||
#[repr(u32)]
|
||||
|
@ -27,7 +30,11 @@ pub(crate) enum OnError {
|
|||
}
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct S3Sink(ObjectSubclass<imp::S3Sink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
|
||||
pub struct S3Sink(ObjectSubclass<multipartsink::S3Sink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
|
||||
}
|
||||
|
||||
glib::wrapper! {
|
||||
pub struct S3PutObjectSink(ObjectSubclass<putobjectsink::S3PutObjectSink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
|
||||
}
|
||||
|
||||
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||
|
@ -43,5 +50,12 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
|||
"awss3sink",
|
||||
gst::Rank::PRIMARY,
|
||||
S3Sink::static_type(),
|
||||
)?;
|
||||
gst::Element::register(
|
||||
Some(plugin),
|
||||
"awss3putobjectsink",
|
||||
// This element should not be autoplugged as it is only useful for specific use cases
|
||||
gst::Rank::NONE,
|
||||
S3PutObjectSink::static_type(),
|
||||
)
|
||||
}
|
||||
|
|
593
net/aws/src/s3sink/putobjectsink.rs
Normal file
593
net/aws/src/s3sink/putobjectsink.rs
Normal file
|
@ -0,0 +1,593 @@
|
|||
// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
|
||||
// Copyright (C) 2023 Asymptotic Inc
|
||||
// Author: Arun Raghavan <arun@asymptotic.io>
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||
// <https://mozilla.org/MPL/2.0/>.
|
||||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use gst::glib;
|
||||
use gst::prelude::*;
|
||||
use gst::subclass::prelude::*;
|
||||
use gst_base::subclass::prelude::*;
|
||||
|
||||
use aws_sdk_s3::{
|
||||
config::{self, retry::RetryConfig, Credentials, Region},
|
||||
operation::put_object::builders::PutObjectFluentBuilder,
|
||||
primitives::ByteStream,
|
||||
Client,
|
||||
};
|
||||
|
||||
use futures::future;
|
||||
use gst::glib::once_cell::sync::Lazy;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::From;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::s3url::*;
|
||||
use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
|
||||
|
||||
const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
|
||||
|
||||
// General setting for create / abort requests
|
||||
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
|
||||
|
||||
struct Started {
|
||||
client: Client,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Started {
|
||||
pub fn new(client: Client, buffer: Vec<u8>) -> Started {
|
||||
Started { client, buffer }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
enum State {
|
||||
#[default]
|
||||
Stopped,
|
||||
Started(Started),
|
||||
}
|
||||
|
||||
struct Settings {
|
||||
region: Region,
|
||||
bucket: Option<String>,
|
||||
key: Option<String>,
|
||||
content_type: Option<String>,
|
||||
content_disposition: Option<String>,
|
||||
access_key: Option<String>,
|
||||
secret_access_key: Option<String>,
|
||||
session_token: Option<String>,
|
||||
metadata: Option<gst::Structure>,
|
||||
retry_attempts: u32,
|
||||
request_timeout: Duration,
|
||||
endpoint_uri: Option<String>,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
fn to_uri(&self) -> String {
|
||||
GstS3Url {
|
||||
region: self.region.clone(),
|
||||
bucket: self.bucket.clone().unwrap(),
|
||||
object: self.key.clone().unwrap(),
|
||||
version: None,
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
|
||||
fn to_metadata(&self, imp: &S3PutObjectSink) -> Option<HashMap<String, String>> {
|
||||
self.metadata.as_ref().map(|structure| {
|
||||
let mut hash = HashMap::new();
|
||||
|
||||
for (key, value) in structure.iter() {
|
||||
if let Ok(Ok(value_str)) = value.transform::<String>().map(|v| v.get()) {
|
||||
gst::log!(CAT, imp: imp, "metadata '{}' -> '{}'", key, value_str);
|
||||
hash.insert(key.to_string(), value_str);
|
||||
} else {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
imp: imp,
|
||||
"Failed to convert metadata '{}' to string ('{:?}')",
|
||||
key,
|
||||
value
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
hash
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Settings {
|
||||
fn default() -> Self {
|
||||
Settings {
|
||||
region: Region::new("us-west-2"),
|
||||
bucket: None,
|
||||
key: None,
|
||||
content_type: None,
|
||||
content_disposition: None,
|
||||
access_key: None,
|
||||
secret_access_key: None,
|
||||
session_token: None,
|
||||
metadata: None,
|
||||
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
|
||||
request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
|
||||
endpoint_uri: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct S3PutObjectSink {
|
||||
url: Mutex<Option<GstS3Url>>,
|
||||
settings: Mutex<Settings>,
|
||||
state: Mutex<State>,
|
||||
canceller: Mutex<Option<future::AbortHandle>>,
|
||||
}
|
||||
|
||||
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||
gst::DebugCategory::new(
|
||||
"awss3putobjectsink",
|
||||
gst::DebugColorFlags::empty(),
|
||||
Some("Amazon S3 PutObject Sink"),
|
||||
)
|
||||
});
|
||||
|
||||
impl S3PutObjectSink {
|
||||
fn flush_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
|
||||
let put_object_req = self.create_put_object_request();
|
||||
|
||||
let put_object_req_future = put_object_req.send();
|
||||
let _output =
|
||||
s3utils::wait(&self.canceller, put_object_req_future).map_err(|err| match err {
|
||||
WaitError::FutureError(err) => Some(gst::error_msg!(
|
||||
gst::ResourceError::OpenWrite,
|
||||
["Failed to upload part: {}", err]
|
||||
)),
|
||||
WaitError::Cancelled => None,
|
||||
})?;
|
||||
|
||||
gst::debug!(CAT, imp: self, "Uploaded complete");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_put_object_request(&self) -> PutObjectFluentBuilder {
|
||||
let url = self.url.lock().unwrap();
|
||||
let settings = self.settings.lock().unwrap();
|
||||
let state = self.state.lock().unwrap();
|
||||
let state = match *state {
|
||||
State::Started(ref started_state) => started_state,
|
||||
State::Stopped => {
|
||||
unreachable!("Element should be started");
|
||||
}
|
||||
};
|
||||
|
||||
let body = Some(ByteStream::from(state.buffer.clone()));
|
||||
|
||||
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
|
||||
let key = Some(url.as_ref().unwrap().object.to_owned());
|
||||
let metadata = settings.to_metadata(self);
|
||||
|
||||
let client = &state.client;
|
||||
|
||||
client
|
||||
.put_object()
|
||||
.set_body(body)
|
||||
.set_bucket(bucket)
|
||||
.set_key(key)
|
||||
.set_metadata(metadata)
|
||||
}
|
||||
|
||||
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
if let State::Started { .. } = *state {
|
||||
unreachable!("Element should be started");
|
||||
}
|
||||
|
||||
let s3url = {
|
||||
let url = self.url.lock().unwrap();
|
||||
match *url {
|
||||
Some(ref url) => url.clone(),
|
||||
None => {
|
||||
return Err(gst::error_msg!(
|
||||
gst::ResourceError::Settings,
|
||||
["Cannot start without a URL being set"]
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let timeout_config = s3utils::timeout_config(settings.request_timeout);
|
||||
|
||||
let cred = match (
|
||||
settings.access_key.as_ref(),
|
||||
settings.secret_access_key.as_ref(),
|
||||
) {
|
||||
(Some(access_key), Some(secret_access_key)) => Some(Credentials::new(
|
||||
access_key.clone(),
|
||||
secret_access_key.clone(),
|
||||
settings.session_token.clone(),
|
||||
None,
|
||||
"aws-s3-putobject-sink",
|
||||
)),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let sdk_config =
|
||||
s3utils::wait_config(&self.canceller, s3url.region.clone(), timeout_config, cred)
|
||||
.map_err(|err| match err {
|
||||
WaitError::FutureError(err) => gst::error_msg!(
|
||||
gst::ResourceError::OpenWrite,
|
||||
["Failed to create SDK config: {}", err]
|
||||
),
|
||||
WaitError::Cancelled => {
|
||||
gst::error_msg!(
|
||||
gst::LibraryError::Failed,
|
||||
["SDK config request interrupted during start"]
|
||||
)
|
||||
}
|
||||
})?;
|
||||
|
||||
let config_builder = config::Builder::from(&sdk_config)
|
||||
.retry_config(RetryConfig::standard().with_max_attempts(settings.retry_attempts));
|
||||
|
||||
let config = if let Some(ref uri) = settings.endpoint_uri {
|
||||
config_builder.endpoint_url(uri).build()
|
||||
} else {
|
||||
config_builder.build()
|
||||
};
|
||||
|
||||
let client = Client::from_conf(config);
|
||||
|
||||
*state = State::Started(Started::new(client, Vec::new()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cancel(&self) {
|
||||
let mut canceller = self.canceller.lock().unwrap();
|
||||
|
||||
if let Some(c) = canceller.take() {
|
||||
c.abort()
|
||||
};
|
||||
}
|
||||
|
||||
fn set_uri(self: &S3PutObjectSink, url_str: Option<&str>) -> Result<(), glib::Error> {
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
if let State::Started { .. } = *state {
|
||||
return Err(glib::Error::new(
|
||||
gst::URIError::BadState,
|
||||
"Cannot set URI on a started s3sink",
|
||||
));
|
||||
}
|
||||
|
||||
let mut url = self.url.lock().unwrap();
|
||||
|
||||
if url_str.is_none() {
|
||||
*url = None;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
gst::debug!(CAT, imp: self, "Setting uri to {:?}", url_str);
|
||||
|
||||
let url_str = url_str.unwrap();
|
||||
match parse_s3_url(url_str) {
|
||||
Ok(s3url) => {
|
||||
*url = Some(s3url);
|
||||
Ok(())
|
||||
}
|
||||
Err(_) => Err(glib::Error::new(
|
||||
gst::URIError::BadUri,
|
||||
"Could not parse URI",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[glib::object_subclass]
|
||||
impl ObjectSubclass for S3PutObjectSink {
|
||||
const NAME: &'static str = "GstAwsS3PutObjectSink";
|
||||
type Type = super::S3PutObjectSink;
|
||||
type ParentType = gst_base::BaseSink;
|
||||
}
|
||||
|
||||
impl ObjectImpl for S3PutObjectSink {
|
||||
fn properties() -> &'static [glib::ParamSpec] {
|
||||
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||
vec![
|
||||
glib::ParamSpecString::builder("bucket")
|
||||
.nick("S3 Bucket")
|
||||
.blurb("The bucket of the file to write")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("key")
|
||||
.nick("S3 Key")
|
||||
.blurb("The key of the file to write")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("region")
|
||||
.nick("AWS Region")
|
||||
.blurb("An AWS region (e.g. eu-west-2).")
|
||||
.default_value(Some("us-west-2"))
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("uri")
|
||||
.nick("URI")
|
||||
.blurb("The S3 object URI")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("access-key")
|
||||
.nick("Access Key")
|
||||
.blurb("AWS Access Key")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("secret-access-key")
|
||||
.nick("Secret Access Key")
|
||||
.blurb("AWS Secret Access Key")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("session-token")
|
||||
.nick("Session Token")
|
||||
.blurb("AWS temporary Session Token from STS")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecBoxed::builder::<gst::Structure>("metadata")
|
||||
.nick("Metadata")
|
||||
.blurb("A map of metadata to store with the object in S3; field values need to be convertible to strings.")
|
||||
.mutable_ready()
|
||||
.build(),
|
||||
glib::ParamSpecUInt::builder("retry-attempts")
|
||||
.nick("Retry attempts")
|
||||
.blurb("Number of times AWS SDK attempts a request before abandoning the request")
|
||||
.minimum(1)
|
||||
.maximum(10)
|
||||
.default_value(DEFAULT_RETRY_ATTEMPTS)
|
||||
.build(),
|
||||
glib::ParamSpecInt64::builder("request-timeout")
|
||||
.nick("Request timeout")
|
||||
.blurb("Timeout for general S3 requests (in ms, set to -1 for infinity)")
|
||||
.minimum(-1)
|
||||
.default_value(DEFAULT_REQUEST_TIMEOUT_MSEC as i64)
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("endpoint-uri")
|
||||
.nick("S3 endpoint URI")
|
||||
.blurb("The S3 endpoint URI to use")
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("content-type")
|
||||
.nick("content-type")
|
||||
.blurb("Content-Type header to set for uploaded object")
|
||||
.build(),
|
||||
glib::ParamSpecString::builder("content-disposition")
|
||||
.nick("content-disposition")
|
||||
.blurb("Content-Disposition header to set for uploaded object")
|
||||
.build(),
|
||||
]
|
||||
});
|
||||
|
||||
PROPERTIES.as_ref()
|
||||
}
|
||||
|
||||
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
||||
let mut settings = self.settings.lock().unwrap();
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Setting property '{}' to '{:?}'",
|
||||
pspec.name(),
|
||||
value
|
||||
);
|
||||
|
||||
match pspec.name() {
|
||||
"bucket" => {
|
||||
settings.bucket = value
|
||||
.get::<Option<String>>()
|
||||
.expect("type checked upstream");
|
||||
if settings.key.is_some() {
|
||||
let _ = self.set_uri(Some(&settings.to_uri()));
|
||||
}
|
||||
}
|
||||
"key" => {
|
||||
settings.key = value
|
||||
.get::<Option<String>>()
|
||||
.expect("type checked upstream");
|
||||
if settings.bucket.is_some() {
|
||||
let _ = self.set_uri(Some(&settings.to_uri()));
|
||||
}
|
||||
}
|
||||
"region" => {
|
||||
let region = value.get::<String>().expect("type checked upstream");
|
||||
settings.region = Region::new(region);
|
||||
if settings.key.is_some() && settings.bucket.is_some() {
|
||||
let _ = self.set_uri(Some(&settings.to_uri()));
|
||||
}
|
||||
}
|
||||
"uri" => {
|
||||
let _ = self.set_uri(value.get().expect("type checked upstream"));
|
||||
}
|
||||
"access-key" => {
|
||||
settings.access_key = value.get().expect("type checked upstream");
|
||||
}
|
||||
"secret-access-key" => {
|
||||
settings.secret_access_key = value.get().expect("type checked upstream");
|
||||
}
|
||||
"session-token" => {
|
||||
settings.session_token = value.get().expect("type checked upstream");
|
||||
}
|
||||
"metadata" => {
|
||||
settings.metadata = value.get().expect("type checked upstream");
|
||||
}
|
||||
"retry-attempts" => {
|
||||
settings.retry_attempts = value.get::<u32>().expect("type checked upstream");
|
||||
}
|
||||
"request-timeout" => {
|
||||
settings.request_timeout =
|
||||
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
||||
}
|
||||
"endpoint-uri" => {
|
||||
settings.endpoint_uri = value
|
||||
.get::<Option<String>>()
|
||||
.expect("type checked upstream");
|
||||
if settings.key.is_some() && settings.bucket.is_some() {
|
||||
let _ = self.set_uri(Some(&settings.to_uri()));
|
||||
}
|
||||
}
|
||||
"content-type" => {
|
||||
settings.content_type = value
|
||||
.get::<Option<String>>()
|
||||
.expect("type checked upstream");
|
||||
}
|
||||
"content-disposition" => {
|
||||
settings.content_disposition = value
|
||||
.get::<Option<String>>()
|
||||
.expect("type checked upstream");
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
match pspec.name() {
|
||||
"key" => settings.key.to_value(),
|
||||
"bucket" => settings.bucket.to_value(),
|
||||
"region" => settings.region.to_string().to_value(),
|
||||
"uri" => {
|
||||
let url = self.url.lock().unwrap();
|
||||
let url = match *url {
|
||||
Some(ref url) => url.to_string(),
|
||||
None => "".to_string(),
|
||||
};
|
||||
|
||||
url.to_value()
|
||||
}
|
||||
"access-key" => settings.access_key.to_value(),
|
||||
"secret-access-key" => settings.secret_access_key.to_value(),
|
||||
"session-token" => settings.session_token.to_value(),
|
||||
"metadata" => settings.metadata.to_value(),
|
||||
"retry-attempts" => settings.retry_attempts.to_value(),
|
||||
"request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(),
|
||||
"endpoint-uri" => settings.endpoint_uri.to_value(),
|
||||
"content-type" => settings.content_type.to_value(),
|
||||
"content-disposition" => settings.content_disposition.to_value(),
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GstObjectImpl for S3PutObjectSink {}
|
||||
|
||||
impl ElementImpl for S3PutObjectSink {
|
||||
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||
gst::subclass::ElementMetadata::new(
|
||||
"Amazon S3 PutObject sink",
|
||||
"Source/Network",
|
||||
"Writes an object to Amazon S3 using PutObject (mostly useful for small files)",
|
||||
"Arun Raghavan <arun@asymptotic.io>",
|
||||
)
|
||||
});
|
||||
|
||||
Some(&*ELEMENT_METADATA)
|
||||
}
|
||||
|
||||
fn pad_templates() -> &'static [gst::PadTemplate] {
|
||||
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
||||
let caps = gst::Caps::new_any();
|
||||
let sink_pad_template = gst::PadTemplate::new(
|
||||
"sink",
|
||||
gst::PadDirection::Sink,
|
||||
gst::PadPresence::Always,
|
||||
&caps,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
vec![sink_pad_template]
|
||||
});
|
||||
|
||||
PAD_TEMPLATES.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl BaseSinkImpl for S3PutObjectSink {
|
||||
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
||||
self.start()
|
||||
}
|
||||
|
||||
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
*state = State::Stopped;
|
||||
gst::info!(CAT, imp: self, "Stopped");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
let started_state = match *state {
|
||||
State::Started(ref mut s) => s,
|
||||
State::Stopped => {
|
||||
gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
|
||||
return Err(gst::FlowError::Error);
|
||||
}
|
||||
};
|
||||
|
||||
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
|
||||
let map = buffer.map_readable().map_err(|_| {
|
||||
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
|
||||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
started_state.buffer.extend_from_slice(map.as_slice());
|
||||
drop(state);
|
||||
|
||||
match self.flush_buffer() {
|
||||
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
||||
Err(err) => match err {
|
||||
Some(error_message) => {
|
||||
gst::error!(CAT, imp: self, "Upload failed: {}", error_message);
|
||||
self.post_error_message(error_message);
|
||||
Err(gst::FlowError::Error)
|
||||
}
|
||||
_ => {
|
||||
gst::info!(CAT, imp: self, "Upload interrupted. Flushing...");
|
||||
Err(gst::FlowError::Flushing)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
|
||||
self.cancel();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn event(&self, event: gst::Event) -> bool {
|
||||
if let gst::EventView::Eos(_) = event.view() {
|
||||
if let Err(error_message) = self.flush_buffer() {
|
||||
gst::error!(
|
||||
CAT,
|
||||
imp: self,
|
||||
"Failed to finalize the upload: {:?}",
|
||||
error_message
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
BaseSinkImplExt::parent_event(self, event)
|
||||
}
|
||||
}
|
|
@ -10,8 +10,6 @@
|
|||
|
||||
// The test times out on Windows for some reason, skip until we figure out why
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
#[test_with::env(AWS_ACCESS_KEY_ID)]
|
||||
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use gst::prelude::*;
|
||||
|
@ -30,41 +28,9 @@ mod tests {
|
|||
});
|
||||
}
|
||||
|
||||
// Common helper
|
||||
async fn do_s3_test(key_prefix: &str) {
|
||||
init();
|
||||
|
||||
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
|
||||
let bucket =
|
||||
std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
|
||||
let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
|
||||
let uri = format!("s3://{region}/{bucket}/{key}");
|
||||
let content = "Hello, world!\n".as_bytes();
|
||||
|
||||
// Manually add the element so we can configure it before it goes to PLAYING
|
||||
let mut h1 = gst_check::Harness::new_empty();
|
||||
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
|
||||
// adding an element manually
|
||||
h1.add_parse(format!("awss3sink uri=\"{uri}\"").as_str());
|
||||
|
||||
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
||||
h1.play();
|
||||
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push_event(gst::event::Eos::new());
|
||||
|
||||
let mut h2 = gst_check::Harness::new("awss3src");
|
||||
h2.element().unwrap().set_property("uri", uri.clone());
|
||||
h2.play();
|
||||
|
||||
let buf = h2.pull_until_eos().unwrap().unwrap();
|
||||
assert_eq!(
|
||||
content,
|
||||
buf.into_mapped_buffer_readable().unwrap().as_slice()
|
||||
);
|
||||
|
||||
async fn delete_object(region: String, bucket: &str, key: &str) {
|
||||
let region_provider = aws_config::meta::region::RegionProviderChain::first_try(
|
||||
aws_sdk_s3::config::Region::new(region.clone()),
|
||||
aws_sdk_s3::config::Region::new(region),
|
||||
)
|
||||
.or_default_provider();
|
||||
|
||||
|
@ -83,18 +49,118 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_s3_simple() {
|
||||
do_s3_test("s3-test").await;
|
||||
// Common helper
|
||||
async fn do_s3_multipart_test(key_prefix: &str) {
|
||||
init();
|
||||
|
||||
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
|
||||
let bucket =
|
||||
std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
|
||||
let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
|
||||
let uri = format!("s3://{region}/{bucket}/{key}");
|
||||
let content = "Hello, world!\n".as_bytes();
|
||||
|
||||
// Manually add the element so we can configure it before it goes to PLAYING
|
||||
let mut h1 = gst_check::Harness::new_empty();
|
||||
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
|
||||
// adding an element manually
|
||||
|
||||
h1.add_parse(format!("awss3sink uri=\"{uri}\"").as_str());
|
||||
|
||||
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
||||
h1.play();
|
||||
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push_event(gst::event::Eos::new());
|
||||
|
||||
let mut h2 = gst_check::Harness::new("awss3src");
|
||||
h2.element().unwrap().set_property("uri", uri.clone());
|
||||
h2.play();
|
||||
|
||||
let buf = h2.pull_until_eos().unwrap().unwrap();
|
||||
assert_eq!(
|
||||
content.repeat(5),
|
||||
buf.into_mapped_buffer_readable().unwrap().as_slice()
|
||||
);
|
||||
|
||||
delete_object(region.clone(), &bucket, &key).await;
|
||||
}
|
||||
|
||||
// Common helper
|
||||
async fn do_s3_putobject_test(key_prefix: &str) {
|
||||
init();
|
||||
|
||||
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
|
||||
let bucket =
|
||||
std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
|
||||
let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
|
||||
let uri = format!("s3://{region}/{bucket}/{key}");
|
||||
let content = "Hello, world!\n".as_bytes();
|
||||
|
||||
// Manually add the element so we can configure it before it goes to PLAYING
|
||||
let mut h1 = gst_check::Harness::new_empty();
|
||||
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
|
||||
// adding an element manually
|
||||
|
||||
h1.add_parse(
|
||||
format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\"")
|
||||
.as_str(),
|
||||
);
|
||||
|
||||
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
||||
h1.play();
|
||||
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
||||
h1.push_event(gst::event::Eos::new());
|
||||
|
||||
let mut h2 = gst_check::Harness::new("awss3src");
|
||||
h2.element().unwrap().set_property("uri", uri.clone());
|
||||
h2.play();
|
||||
|
||||
let buf = h2.pull_until_eos().unwrap().unwrap();
|
||||
assert_eq!(
|
||||
content.repeat(5),
|
||||
buf.into_mapped_buffer_readable().unwrap().as_slice()
|
||||
);
|
||||
|
||||
delete_object(region.clone(), &bucket, &key).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_s3_whitespace() {
|
||||
do_s3_test("s3 test").await;
|
||||
async fn test_s3_multipart_simple() {
|
||||
do_s3_multipart_test("s3-test").await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_s3_unicode() {
|
||||
do_s3_test("s3 🧪 😱").await;
|
||||
async fn test_s3_multipart_whitespace() {
|
||||
do_s3_multipart_test("s3 test").await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_s3_multipart_unicode() {
|
||||
do_s3_multipart_test("s3 🧪 😱").await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_s3_put_object_simple() {
|
||||
do_s3_putobject_test("s3-put-object-test").await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_s3_put_object_whitespace() {
|
||||
do_s3_putobject_test("s3 put object test").await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_s3_put_object_unicode() {
|
||||
do_s3_putobject_test("s3 put object 🧪 😱").await;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue