2019-06-12 08:07:39 +00:00
|
|
|
// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
|
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
|
|
// option. This file may not be copied, modified, or distributed
|
|
|
|
// except according to those terms.
|
|
|
|
|
|
|
|
use glib::prelude::*;
|
|
|
|
use glib::subclass;
|
|
|
|
use glib::subclass::prelude::*;
|
|
|
|
|
|
|
|
use gst::prelude::*;
|
|
|
|
use gst::subclass::prelude::*;
|
2020-12-20 18:43:45 +00:00
|
|
|
use gst::{gst_error, gst_info, gst_trace};
|
2019-06-12 08:07:39 +00:00
|
|
|
|
|
|
|
use gst_base::subclass::prelude::*;
|
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
use futures::future;
|
2019-06-12 08:07:39 +00:00
|
|
|
use rusoto_core::region::Region;
|
|
|
|
use rusoto_s3::{
|
|
|
|
CompleteMultipartUploadRequest, CompletedMultipartUpload, CompletedPart,
|
|
|
|
CreateMultipartUploadRequest, S3Client, UploadPartRequest, S3,
|
|
|
|
};
|
|
|
|
|
2020-11-22 15:43:59 +00:00
|
|
|
use once_cell::sync::Lazy;
|
|
|
|
|
2019-06-12 08:07:39 +00:00
|
|
|
use std::convert::From;
|
|
|
|
use std::str::FromStr;
|
|
|
|
use std::sync::Mutex;
|
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
use crate::s3utils::{self, WaitError};
|
2019-06-12 08:07:39 +00:00
|
|
|
|
|
|
|
struct Started {
|
2020-04-20 01:28:30 +00:00
|
|
|
client: S3Client,
|
2019-06-12 08:07:39 +00:00
|
|
|
buffer: Vec<u8>,
|
|
|
|
upload_id: String,
|
|
|
|
part_number: i64,
|
|
|
|
completed_parts: Vec<CompletedPart>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Started {
|
2020-04-20 01:28:30 +00:00
|
|
|
pub fn new(client: S3Client, buffer: Vec<u8>, upload_id: String) -> Started {
|
2019-06-12 08:07:39 +00:00
|
|
|
Started {
|
2020-04-20 01:28:30 +00:00
|
|
|
client,
|
2019-06-12 08:07:39 +00:00
|
|
|
buffer,
|
|
|
|
upload_id,
|
|
|
|
part_number: 0,
|
|
|
|
completed_parts: Vec::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn increment_part_number(&mut self) -> Result<i64, gst::ErrorMessage> {
|
|
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
|
|
|
|
const MAX_MULTIPART_NUMBER: i64 = 10000;
|
|
|
|
|
|
|
|
if self.part_number > MAX_MULTIPART_NUMBER {
|
2020-12-20 18:43:45 +00:00
|
|
|
return Err(gst::error_msg!(
|
2019-06-12 08:07:39 +00:00
|
|
|
gst::ResourceError::Failed,
|
|
|
|
[
|
|
|
|
"Maximum number of parts ({}) reached.",
|
|
|
|
MAX_MULTIPART_NUMBER
|
|
|
|
]
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
self.part_number += 1;
|
|
|
|
Ok(self.part_number)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
enum State {
|
|
|
|
Stopped,
|
|
|
|
Started(Started),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for State {
|
|
|
|
fn default() -> State {
|
|
|
|
State::Stopped
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
|
|
|
|
|
|
|
|
struct Settings {
|
|
|
|
region: Region,
|
|
|
|
bucket: Option<String>,
|
|
|
|
key: Option<String>,
|
|
|
|
content_type: Option<String>,
|
|
|
|
buffer_size: u64,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct S3Sink {
|
|
|
|
settings: Mutex<Settings>,
|
|
|
|
state: Mutex<State>,
|
2020-04-20 01:28:30 +00:00
|
|
|
canceller: Mutex<Option<future::AbortHandle>>,
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2020-11-22 15:43:59 +00:00
|
|
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|
|
|
gst::DebugCategory::new(
|
2019-10-31 22:34:21 +00:00
|
|
|
"rusotos3sink",
|
|
|
|
gst::DebugColorFlags::empty(),
|
|
|
|
Some("Amazon S3 Sink"),
|
2020-11-22 15:43:59 +00:00
|
|
|
)
|
|
|
|
});
|
2019-10-31 22:34:21 +00:00
|
|
|
|
2019-06-12 08:07:39 +00:00
|
|
|
impl Default for Settings {
|
|
|
|
fn default() -> Self {
|
|
|
|
Settings {
|
|
|
|
region: Region::default(),
|
|
|
|
bucket: None,
|
|
|
|
key: None,
|
|
|
|
content_type: None,
|
|
|
|
buffer_size: DEFAULT_BUFFER_SIZE,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static PROPERTIES: [subclass::Property; 4] = [
|
|
|
|
subclass::Property("bucket", |name| {
|
|
|
|
glib::ParamSpec::string(
|
|
|
|
name,
|
|
|
|
"S3 Bucket",
|
|
|
|
"The bucket of the file to write",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("key", |name| {
|
|
|
|
glib::ParamSpec::string(
|
|
|
|
name,
|
|
|
|
"S3 Key",
|
|
|
|
"The key of the file to write",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("region", |name| {
|
|
|
|
glib::ParamSpec::string(
|
|
|
|
name,
|
|
|
|
"AWS Region",
|
|
|
|
"An AWS region (e.g. eu-west-2).",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
subclass::Property("part-size", |name| {
|
|
|
|
glib::ParamSpec::uint64(
|
|
|
|
name,
|
|
|
|
"Part size",
|
|
|
|
"A size (in bytes) of an individual part used for multipart upload.",
|
|
|
|
5 * 1024 * 1024, // 5 MB
|
|
|
|
5 * 1024 * 1024 * 1024, // 5 GB
|
|
|
|
DEFAULT_BUFFER_SIZE,
|
|
|
|
glib::ParamFlags::READWRITE, /* + GST_PARAM_MUTABLE_READY) */
|
|
|
|
)
|
|
|
|
}),
|
|
|
|
];
|
|
|
|
|
|
|
|
impl S3Sink {
|
|
|
|
fn flush_current_buffer(
|
|
|
|
&self,
|
2020-11-14 17:24:01 +00:00
|
|
|
element: &super::S3Sink,
|
2019-06-12 08:07:39 +00:00
|
|
|
) -> Result<(), Option<gst::ErrorMessage>> {
|
|
|
|
let upload_part_req = self.create_upload_part_request()?;
|
|
|
|
let part_number = upload_part_req.part_number;
|
|
|
|
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
};
|
2020-04-20 01:28:30 +00:00
|
|
|
|
|
|
|
let upload_part_req_future = state.client.upload_part(upload_part_req);
|
|
|
|
|
|
|
|
let output =
|
|
|
|
s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
|
2020-12-20 18:43:45 +00:00
|
|
|
WaitError::FutureError(err) => Some(gst::error_msg!(
|
2020-04-20 01:28:30 +00:00
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to upload part: {}", err]
|
|
|
|
)),
|
|
|
|
WaitError::Cancelled => None,
|
|
|
|
})?;
|
|
|
|
|
2019-06-12 08:07:39 +00:00
|
|
|
state.completed_parts.push(CompletedPart {
|
2019-11-24 22:00:27 +00:00
|
|
|
e_tag: output.e_tag,
|
2019-06-12 08:07:39 +00:00
|
|
|
part_number: Some(part_number),
|
|
|
|
});
|
2019-10-31 22:34:21 +00:00
|
|
|
gst_info!(CAT, obj: element, "Uploaded part {}", part_number);
|
2019-06-12 08:07:39 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn create_upload_part_request(&self) -> Result<UploadPartRequest, gst::ErrorMessage> {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let part_number = state.increment_part_number()?;
|
|
|
|
Ok(UploadPartRequest {
|
|
|
|
body: Some(rusoto_core::ByteStream::from(std::mem::replace(
|
|
|
|
&mut state.buffer,
|
|
|
|
Vec::with_capacity(settings.buffer_size as usize),
|
|
|
|
))),
|
|
|
|
bucket: settings.bucket.as_ref().unwrap().to_owned(),
|
|
|
|
key: settings.key.as_ref().unwrap().to_owned(),
|
|
|
|
upload_id: state.upload_id.to_owned(),
|
|
|
|
part_number,
|
|
|
|
..Default::default()
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
fn create_complete_multipart_upload_request(
|
|
|
|
&self,
|
|
|
|
started_state: &mut Started,
|
|
|
|
settings: &Settings,
|
|
|
|
) -> CompleteMultipartUploadRequest {
|
2019-06-12 08:07:39 +00:00
|
|
|
started_state
|
|
|
|
.completed_parts
|
|
|
|
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
|
|
|
|
|
|
|
|
let completed_upload = CompletedMultipartUpload {
|
|
|
|
parts: Some(std::mem::replace(
|
|
|
|
&mut started_state.completed_parts,
|
|
|
|
Vec::new(),
|
|
|
|
)),
|
|
|
|
};
|
|
|
|
|
|
|
|
CompleteMultipartUploadRequest {
|
|
|
|
bucket: settings.bucket.as_ref().unwrap().to_owned(),
|
|
|
|
key: settings.key.as_ref().unwrap().to_owned(),
|
|
|
|
upload_id: started_state.upload_id.to_owned(),
|
|
|
|
multipart_upload: Some(completed_upload),
|
|
|
|
..Default::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn create_create_multipart_upload_request(
|
|
|
|
&self,
|
2020-04-20 01:28:30 +00:00
|
|
|
settings: &Settings,
|
2019-06-12 08:07:39 +00:00
|
|
|
) -> Result<CreateMultipartUploadRequest, gst::ErrorMessage> {
|
|
|
|
if settings.bucket.is_none() || settings.key.is_none() {
|
2020-12-20 18:43:45 +00:00
|
|
|
return Err(gst::error_msg!(
|
2019-06-12 08:07:39 +00:00
|
|
|
gst::ResourceError::Settings,
|
|
|
|
["Bucket or key is not defined"]
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
let bucket = settings.bucket.as_ref().unwrap();
|
|
|
|
let key = settings.key.as_ref().unwrap();
|
|
|
|
|
|
|
|
Ok(CreateMultipartUploadRequest {
|
|
|
|
bucket: bucket.clone(),
|
|
|
|
key: key.clone(),
|
|
|
|
content_type: settings.content_type.clone(),
|
|
|
|
..Default::default()
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-11-14 17:24:01 +00:00
|
|
|
fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
|
2019-06-12 08:07:39 +00:00
|
|
|
if self.flush_current_buffer(element).is_err() {
|
2020-12-20 18:43:45 +00:00
|
|
|
return Err(gst::error_msg!(
|
2019-06-12 08:07:39 +00:00
|
|
|
gst::ResourceError::Settings,
|
|
|
|
["Failed to flush internal buffer."]
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let started_state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
let complete_req = self.create_complete_multipart_upload_request(started_state, &settings);
|
|
|
|
let complete_req_future = started_state.client.complete_multipart_upload(complete_req);
|
|
|
|
|
|
|
|
s3utils::wait(&self.canceller, complete_req_future)
|
|
|
|
.map(|_| ())
|
|
|
|
.map_err(|err| match err {
|
2020-12-20 18:43:45 +00:00
|
|
|
WaitError::FutureError(err) => gst::error_msg!(
|
2019-06-12 08:07:39 +00:00
|
|
|
gst::ResourceError::Write,
|
|
|
|
["Failed to complete multipart upload: {}.", err.to_string()]
|
2020-04-20 01:28:30 +00:00
|
|
|
),
|
|
|
|
WaitError::Cancelled => {
|
2020-12-20 18:43:45 +00:00
|
|
|
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during stop"])
|
2020-04-20 01:28:30 +00:00
|
|
|
}
|
2019-06-12 08:07:39 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
if let State::Started { .. } = *state {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
|
|
|
|
let client = S3Client::new(settings.region.clone());
|
|
|
|
|
|
|
|
let create_multipart_req = self.create_create_multipart_upload_request(&settings)?;
|
|
|
|
let create_multipart_req_future = client.create_multipart_upload(create_multipart_req);
|
|
|
|
|
|
|
|
let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
|
|
|
|
|err| match err {
|
2020-12-20 18:43:45 +00:00
|
|
|
WaitError::FutureError(err) => gst::error_msg!(
|
2019-06-12 08:07:39 +00:00
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to create multipart upload: {}", err]
|
2020-04-20 01:28:30 +00:00
|
|
|
),
|
|
|
|
WaitError::Cancelled => {
|
2020-12-20 18:43:45 +00:00
|
|
|
gst::error_msg!(gst::LibraryError::Failed, ["Interrupted during start"])
|
2020-04-20 01:28:30 +00:00
|
|
|
}
|
|
|
|
},
|
|
|
|
)?;
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2019-07-04 15:30:26 +00:00
|
|
|
let upload_id = response.upload_id.ok_or_else(|| {
|
2020-12-20 18:43:45 +00:00
|
|
|
gst::error_msg!(
|
2019-07-04 15:30:26 +00:00
|
|
|
gst::ResourceError::Failed,
|
|
|
|
["Failed to get multipart upload ID"]
|
|
|
|
)
|
|
|
|
})?;
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
*state = State::Started(Started::new(
|
|
|
|
client,
|
|
|
|
Vec::with_capacity(settings.buffer_size as usize),
|
2019-06-12 08:07:39 +00:00
|
|
|
upload_id,
|
2020-04-20 01:28:30 +00:00
|
|
|
));
|
|
|
|
|
|
|
|
Ok(())
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn update_buffer(
|
|
|
|
&self,
|
|
|
|
src: &[u8],
|
2020-11-14 17:24:01 +00:00
|
|
|
element: &super::S3Sink,
|
2019-06-12 08:07:39 +00:00
|
|
|
) -> Result<(), Option<gst::ErrorMessage>> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let started_state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
unreachable!("Element should be started already");
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let to_copy = std::cmp::min(
|
|
|
|
started_state.buffer.capacity() - started_state.buffer.len(),
|
|
|
|
src.len(),
|
|
|
|
);
|
|
|
|
|
|
|
|
let (head, tail) = src.split_at(to_copy);
|
|
|
|
started_state.buffer.extend_from_slice(head);
|
|
|
|
let do_flush = started_state.buffer.capacity() == started_state.buffer.len();
|
|
|
|
drop(state);
|
|
|
|
|
|
|
|
if do_flush {
|
|
|
|
self.flush_current_buffer(element)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if to_copy < src.len() {
|
|
|
|
self.update_buffer(tail, element)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn cancel(&self) {
|
|
|
|
let mut canceller = self.canceller.lock().unwrap();
|
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
if let Some(c) = canceller.take() {
|
|
|
|
c.abort()
|
|
|
|
};
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectSubclass for S3Sink {
|
2019-10-21 07:31:03 +00:00
|
|
|
const NAME: &'static str = "RusotoS3Sink";
|
2020-11-14 17:24:01 +00:00
|
|
|
type Type = super::S3Sink;
|
2019-06-12 08:07:39 +00:00
|
|
|
type ParentType = gst_base::BaseSink;
|
|
|
|
type Instance = gst::subclass::ElementInstanceStruct<Self>;
|
|
|
|
type Class = subclass::simple::ClassStruct<Self>;
|
|
|
|
|
2020-12-17 22:44:49 +00:00
|
|
|
glib::object_subclass!();
|
2019-06-12 08:07:39 +00:00
|
|
|
|
|
|
|
fn new() -> Self {
|
|
|
|
Self {
|
|
|
|
settings: Mutex::new(Default::default()),
|
|
|
|
state: Mutex::new(Default::default()),
|
|
|
|
canceller: Mutex::new(None),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-14 17:24:01 +00:00
|
|
|
fn class_init(klass: &mut Self::Class) {
|
2019-06-12 08:07:39 +00:00
|
|
|
klass.set_metadata(
|
|
|
|
"Amazon S3 sink",
|
|
|
|
"Source/Network",
|
|
|
|
"Writes an object to Amazon S3",
|
|
|
|
"Marcin Kolny <mkolny@amazon.com>",
|
|
|
|
);
|
|
|
|
|
|
|
|
let caps = gst::Caps::new_any();
|
|
|
|
let sink_pad_template = gst::PadTemplate::new(
|
|
|
|
"sink",
|
|
|
|
gst::PadDirection::Sink,
|
|
|
|
gst::PadPresence::Always,
|
|
|
|
&caps,
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
klass.add_pad_template(sink_pad_template);
|
|
|
|
|
|
|
|
klass.install_properties(&PROPERTIES);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectImpl for S3Sink {
|
2020-11-14 17:24:01 +00:00
|
|
|
fn set_property(&self, _obj: &Self::Type, id: usize, value: &glib::Value) {
|
2019-06-12 08:07:39 +00:00
|
|
|
let prop = &PROPERTIES[id as usize];
|
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
match *prop {
|
|
|
|
subclass::Property("bucket", ..) => {
|
2019-08-12 22:45:36 +00:00
|
|
|
settings.bucket = value.get::<String>().expect("type checked upstream");
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
subclass::Property("key", ..) => {
|
2019-08-12 22:45:36 +00:00
|
|
|
settings.key = value.get::<String>().expect("type checked upstream");
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
subclass::Property("region", ..) => {
|
2020-04-20 01:28:30 +00:00
|
|
|
settings.region = Region::from_str(
|
2019-08-12 22:45:36 +00:00
|
|
|
&value
|
|
|
|
.get::<String>()
|
|
|
|
.expect("type checked upstream")
|
|
|
|
.expect("set_property(\"region\"): no value provided"),
|
|
|
|
)
|
|
|
|
.unwrap();
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
subclass::Property("part-size", ..) => {
|
2019-08-12 22:45:36 +00:00
|
|
|
settings.buffer_size = value.get_some::<u64>().expect("type checked upstream");
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-19 15:55:57 +00:00
|
|
|
fn get_property(&self, _: &Self::Type, id: usize) -> glib::Value {
|
2019-06-12 08:07:39 +00:00
|
|
|
let prop = &PROPERTIES[id as usize];
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
match *prop {
|
2020-11-19 15:55:57 +00:00
|
|
|
subclass::Property("key", ..) => settings.key.to_value(),
|
|
|
|
subclass::Property("bucket", ..) => settings.bucket.to_value(),
|
|
|
|
subclass::Property("region", ..) => settings.region.name().to_value(),
|
|
|
|
subclass::Property("part-size", ..) => settings.buffer_size.to_value(),
|
2019-06-12 08:07:39 +00:00
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ElementImpl for S3Sink {}
|
|
|
|
|
|
|
|
impl BaseSinkImpl for S3Sink {
|
2020-11-14 17:24:01 +00:00
|
|
|
fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
|
2020-04-20 01:28:30 +00:00
|
|
|
self.start()
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2020-11-14 17:24:01 +00:00
|
|
|
fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
|
2019-06-12 08:07:39 +00:00
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
*state = State::Stopped;
|
2019-10-31 22:34:21 +00:00
|
|
|
gst_info!(CAT, obj: element, "Stopped");
|
2019-06-12 08:07:39 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn render(
|
|
|
|
&self,
|
2020-11-14 17:24:01 +00:00
|
|
|
element: &Self::Type,
|
2019-06-12 08:07:39 +00:00
|
|
|
buffer: &gst::Buffer,
|
|
|
|
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
|
|
|
if let State::Stopped = *self.state.lock().unwrap() {
|
2020-12-20 18:43:45 +00:00
|
|
|
gst::element_error!(element, gst::CoreError::Failed, ["Not started yet"]);
|
2019-06-12 08:07:39 +00:00
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
|
2019-10-31 22:34:21 +00:00
|
|
|
gst_trace!(CAT, obj: element, "Rendering {:?}", buffer);
|
2019-12-18 05:50:10 +00:00
|
|
|
let map = buffer.map_readable().map_err(|_| {
|
2020-12-20 18:43:45 +00:00
|
|
|
gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]);
|
2019-06-12 08:07:39 +00:00
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
|
|
|
|
match self.update_buffer(&map, element) {
|
|
|
|
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
|
|
|
Err(err) => match err {
|
|
|
|
Some(error_message) => {
|
|
|
|
gst_error!(
|
2019-10-31 22:34:21 +00:00
|
|
|
CAT,
|
2019-06-12 08:07:39 +00:00
|
|
|
obj: element,
|
|
|
|
"Multipart upload failed: {}",
|
|
|
|
error_message
|
|
|
|
);
|
2020-06-30 20:57:22 +00:00
|
|
|
element.post_error_message(error_message);
|
2019-06-12 08:07:39 +00:00
|
|
|
Err(gst::FlowError::Error)
|
|
|
|
}
|
|
|
|
_ => {
|
2019-10-31 22:34:21 +00:00
|
|
|
gst_info!(CAT, obj: element, "Upload interrupted. Flushing...");
|
2019-06-12 08:07:39 +00:00
|
|
|
Err(gst::FlowError::Flushing)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-14 17:24:01 +00:00
|
|
|
fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
|
2019-06-12 08:07:39 +00:00
|
|
|
self.cancel();
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-11-14 17:24:01 +00:00
|
|
|
fn event(&self, element: &Self::Type, event: gst::Event) -> bool {
|
2019-11-24 22:00:27 +00:00
|
|
|
if let gst::EventView::Eos(_) = event.view() {
|
|
|
|
if let Err(error_message) = self.finalize_upload(element) {
|
|
|
|
gst_error!(
|
|
|
|
CAT,
|
|
|
|
obj: element,
|
|
|
|
"Failed to finalize the upload: {}",
|
|
|
|
error_message
|
|
|
|
);
|
|
|
|
return false;
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
BaseSinkImplExt::parent_event(self, element, event)
|
|
|
|
}
|
|
|
|
}
|