2019-06-12 08:07:39 +00:00
|
|
|
// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
|
|
|
|
//
|
2022-01-15 18:40:12 +00:00
|
|
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
|
|
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
|
|
|
// <https://mozilla.org/MPL/2.0/>.
|
|
|
|
//
|
|
|
|
// SPDX-License-Identifier: MPL-2.0
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2021-06-03 18:20:54 +00:00
|
|
|
use gst::glib;
|
2019-06-12 08:07:39 +00:00
|
|
|
use gst::prelude::*;
|
|
|
|
use gst::subclass::prelude::*;
|
|
|
|
use gst_base::subclass::prelude::*;
|
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
use aws_sdk_s3::client::fluent_builders::{
|
|
|
|
AbortMultipartUpload, CompleteMultipartUpload, CreateMultipartUpload, UploadPart,
|
2019-06-12 08:07:39 +00:00
|
|
|
};
|
2022-05-14 05:01:35 +00:00
|
|
|
use aws_sdk_s3::config;
|
|
|
|
use aws_sdk_s3::model::{CompletedMultipartUpload, CompletedPart};
|
|
|
|
use aws_sdk_s3::types::ByteStream;
|
|
|
|
use aws_sdk_s3::{Client, Credentials, Region, RetryConfig};
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
use futures::future;
|
2020-11-22 15:43:59 +00:00
|
|
|
use once_cell::sync::Lazy;
|
2021-11-22 09:12:51 +00:00
|
|
|
use std::collections::HashMap;
|
2022-05-14 05:01:35 +00:00
|
|
|
use std::convert::From;
|
2019-06-12 08:07:39 +00:00
|
|
|
use std::sync::Mutex;
|
2022-02-23 18:10:30 +00:00
|
|
|
use std::time::Duration;
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2021-04-30 13:05:35 +00:00
|
|
|
use crate::s3url::*;
|
2022-05-14 05:01:35 +00:00
|
|
|
use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2021-12-06 12:06:10 +00:00
|
|
|
use super::OnError;
|
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
|
|
|
|
const DEFAULT_BUFFER_SIZE: u64 = 5 * 1024 * 1024;
|
2021-12-06 12:06:10 +00:00
|
|
|
const DEFAULT_MULTIPART_UPLOAD_ON_ERROR: OnError = OnError::DoNothing;
|
2022-05-14 05:01:35 +00:00
|
|
|
|
2022-03-18 09:44:17 +00:00
|
|
|
// General setting for create / abort requests
|
2022-05-14 05:01:35 +00:00
|
|
|
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
|
2022-03-18 09:44:17 +00:00
|
|
|
const DEFAULT_RETRY_DURATION_MSEC: u64 = 60_000;
|
2022-03-18 10:10:18 +00:00
|
|
|
// This needs to be independently configurable, as the part size can be upto 5GB
|
|
|
|
const DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC: u64 = 10_000;
|
|
|
|
const DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC: u64 = 60_000;
|
2022-03-18 09:44:17 +00:00
|
|
|
// CompletedMultipartUpload can take minutes to complete, so we need a longer value here
|
|
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
|
|
|
|
const DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC: u64 = 600_000; // 10 minutes
|
|
|
|
const DEFAULT_COMPLETE_RETRY_DURATION_MSEC: u64 = 3_600_000; // 60 minutes
|
2021-12-06 12:06:10 +00:00
|
|
|
|
2019-06-12 08:07:39 +00:00
|
|
|
struct Started {
|
2022-05-14 05:01:35 +00:00
|
|
|
client: Client,
|
2019-06-12 08:07:39 +00:00
|
|
|
buffer: Vec<u8>,
|
|
|
|
upload_id: String,
|
|
|
|
part_number: i64,
|
|
|
|
completed_parts: Vec<CompletedPart>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Started {
|
2022-05-14 05:01:35 +00:00
|
|
|
pub fn new(client: Client, buffer: Vec<u8>, upload_id: String) -> Started {
|
2019-06-12 08:07:39 +00:00
|
|
|
Started {
|
2020-04-20 01:28:30 +00:00
|
|
|
client,
|
2019-06-12 08:07:39 +00:00
|
|
|
buffer,
|
|
|
|
upload_id,
|
|
|
|
part_number: 0,
|
|
|
|
completed_parts: Vec::new(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn increment_part_number(&mut self) -> Result<i64, gst::ErrorMessage> {
|
|
|
|
// https://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
|
|
|
|
const MAX_MULTIPART_NUMBER: i64 = 10000;
|
|
|
|
|
|
|
|
if self.part_number > MAX_MULTIPART_NUMBER {
|
2020-12-20 18:43:45 +00:00
|
|
|
return Err(gst::error_msg!(
|
2019-06-12 08:07:39 +00:00
|
|
|
gst::ResourceError::Failed,
|
|
|
|
[
|
|
|
|
"Maximum number of parts ({}) reached.",
|
|
|
|
MAX_MULTIPART_NUMBER
|
|
|
|
]
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
self.part_number += 1;
|
|
|
|
Ok(self.part_number)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
enum State {
|
|
|
|
Stopped,
|
|
|
|
Started(Started),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for State {
|
|
|
|
fn default() -> State {
|
|
|
|
State::Stopped
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
struct Settings {
|
2021-06-21 10:20:32 +00:00
|
|
|
region: Region,
|
|
|
|
bucket: Option<String>,
|
|
|
|
key: Option<String>,
|
2019-06-12 08:07:39 +00:00
|
|
|
content_type: Option<String>,
|
|
|
|
buffer_size: u64,
|
2021-09-27 13:49:12 +00:00
|
|
|
access_key: Option<String>,
|
|
|
|
secret_access_key: Option<String>,
|
2022-06-16 07:16:28 +00:00
|
|
|
session_token: Option<String>,
|
2021-11-22 09:12:51 +00:00
|
|
|
metadata: Option<gst::Structure>,
|
2022-05-14 05:01:35 +00:00
|
|
|
retry_attempts: u32,
|
2021-12-06 12:06:10 +00:00
|
|
|
multipart_upload_on_error: OnError,
|
2022-05-14 05:01:35 +00:00
|
|
|
request_timeout: Duration,
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2021-06-21 10:20:32 +00:00
|
|
|
impl Settings {
|
|
|
|
fn to_uri(&self) -> String {
|
|
|
|
format!(
|
|
|
|
"s3://{}/{}/{}",
|
2022-05-14 05:01:35 +00:00
|
|
|
self.region,
|
2021-06-21 10:20:32 +00:00
|
|
|
self.bucket.as_ref().unwrap(),
|
|
|
|
self.key.as_ref().unwrap()
|
|
|
|
)
|
|
|
|
}
|
2021-11-22 09:12:51 +00:00
|
|
|
|
|
|
|
fn to_metadata(&self, element: &super::S3Sink) -> Option<HashMap<String, String>> {
|
|
|
|
self.metadata.as_ref().map(|structure| {
|
|
|
|
let mut hash = HashMap::new();
|
|
|
|
|
|
|
|
for (key, value) in structure.iter() {
|
|
|
|
if let Ok(Ok(value_str)) = value.transform::<String>().map(|v| v.get()) {
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::log!(CAT, obj: element, "metadata '{}' -> '{}'", key, value_str);
|
2021-11-22 09:12:51 +00:00
|
|
|
hash.insert(key.to_string(), value_str);
|
|
|
|
} else {
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::warning!(
|
2021-11-22 09:12:51 +00:00
|
|
|
CAT,
|
|
|
|
obj: element,
|
|
|
|
"Failed to convert metadata '{}' to string ('{:?}')",
|
|
|
|
key,
|
|
|
|
value
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
hash
|
|
|
|
})
|
|
|
|
}
|
2021-06-21 10:20:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Settings {
|
|
|
|
fn default() -> Self {
|
|
|
|
Settings {
|
2022-05-14 05:01:35 +00:00
|
|
|
region: Region::new("us-west-2"),
|
2021-06-21 10:20:32 +00:00
|
|
|
bucket: None,
|
|
|
|
key: None,
|
|
|
|
content_type: None,
|
2021-09-27 13:49:12 +00:00
|
|
|
access_key: None,
|
|
|
|
secret_access_key: None,
|
2022-06-16 07:16:28 +00:00
|
|
|
session_token: None,
|
2021-11-22 09:12:51 +00:00
|
|
|
metadata: None,
|
2022-05-14 05:01:35 +00:00
|
|
|
buffer_size: DEFAULT_BUFFER_SIZE,
|
|
|
|
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
|
2021-12-06 12:06:10 +00:00
|
|
|
multipart_upload_on_error: DEFAULT_MULTIPART_UPLOAD_ON_ERROR,
|
2022-05-14 05:01:35 +00:00
|
|
|
request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
|
2021-06-21 10:20:32 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-07 16:22:24 +00:00
|
|
|
#[derive(Default)]
|
2019-06-12 08:07:39 +00:00
|
|
|
pub struct S3Sink {
|
2021-04-30 13:05:35 +00:00
|
|
|
url: Mutex<Option<GstS3Url>>,
|
2019-06-12 08:07:39 +00:00
|
|
|
settings: Mutex<Settings>,
|
|
|
|
state: Mutex<State>,
|
2020-04-20 01:28:30 +00:00
|
|
|
canceller: Mutex<Option<future::AbortHandle>>,
|
2021-12-06 12:06:10 +00:00
|
|
|
abort_multipart_canceller: Mutex<Option<future::AbortHandle>>,
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2020-11-22 15:43:59 +00:00
|
|
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|
|
|
gst::DebugCategory::new(
|
2022-05-26 09:52:42 +00:00
|
|
|
"aws3sink",
|
2019-10-31 22:34:21 +00:00
|
|
|
gst::DebugColorFlags::empty(),
|
|
|
|
Some("Amazon S3 Sink"),
|
2020-11-22 15:43:59 +00:00
|
|
|
)
|
|
|
|
});
|
2019-10-31 22:34:21 +00:00
|
|
|
|
2019-06-12 08:07:39 +00:00
|
|
|
impl S3Sink {
|
|
|
|
fn flush_current_buffer(
|
|
|
|
&self,
|
2020-11-14 17:24:01 +00:00
|
|
|
element: &super::S3Sink,
|
2019-06-12 08:07:39 +00:00
|
|
|
) -> Result<(), Option<gst::ErrorMessage>> {
|
2022-05-14 05:01:35 +00:00
|
|
|
let upload_part_req: UploadPart = self.create_upload_part_request()?;
|
|
|
|
|
2019-06-12 08:07:39 +00:00
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
};
|
2020-04-20 01:28:30 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
let part_number = state.part_number;
|
2022-02-23 18:10:30 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
let upload_part_req_future = upload_part_req.send();
|
|
|
|
let output =
|
|
|
|
s3utils::wait(&self.canceller, upload_part_req_future).map_err(|err| match err {
|
|
|
|
WaitError::FutureError(err) => {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
match settings.multipart_upload_on_error {
|
|
|
|
OnError::Abort => {
|
|
|
|
gst::log!(
|
|
|
|
CAT,
|
|
|
|
obj: element,
|
|
|
|
"Aborting multipart upload request with id: {}",
|
|
|
|
state.upload_id
|
|
|
|
);
|
|
|
|
match self.abort_multipart_upload_request(state) {
|
|
|
|
Ok(()) => {
|
|
|
|
gst::log!(
|
|
|
|
CAT,
|
|
|
|
obj: element,
|
|
|
|
"Aborting multipart upload request succeeded."
|
|
|
|
);
|
|
|
|
}
|
|
|
|
Err(err) => gst::error!(
|
2021-12-06 12:06:10 +00:00
|
|
|
CAT,
|
|
|
|
obj: element,
|
2022-05-14 05:01:35 +00:00
|
|
|
"Aborting multipart upload failed: {}",
|
|
|
|
err.to_string()
|
|
|
|
),
|
2021-12-06 12:06:10 +00:00
|
|
|
}
|
2022-05-14 05:01:35 +00:00
|
|
|
}
|
|
|
|
OnError::Complete => {
|
|
|
|
gst::log!(
|
2021-12-06 12:06:10 +00:00
|
|
|
CAT,
|
|
|
|
obj: element,
|
2022-05-14 05:01:35 +00:00
|
|
|
"Completing multipart upload request with id: {}",
|
|
|
|
state.upload_id
|
|
|
|
);
|
|
|
|
match self.complete_multipart_upload_request(state) {
|
|
|
|
Ok(()) => {
|
|
|
|
gst::log!(
|
|
|
|
CAT,
|
|
|
|
obj: element,
|
|
|
|
"Complete multipart upload request succeeded."
|
|
|
|
);
|
|
|
|
}
|
|
|
|
Err(err) => gst::error!(
|
2021-12-06 12:06:10 +00:00
|
|
|
CAT,
|
|
|
|
obj: element,
|
2022-05-14 05:01:35 +00:00
|
|
|
"Completing multipart upload failed: {}",
|
|
|
|
err.to_string()
|
|
|
|
),
|
2021-12-06 12:06:10 +00:00
|
|
|
}
|
|
|
|
}
|
2022-05-14 05:01:35 +00:00
|
|
|
OnError::DoNothing => (),
|
2021-12-06 12:06:10 +00:00
|
|
|
}
|
2022-05-14 05:01:35 +00:00
|
|
|
Some(gst::error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to upload part: {}", err]
|
|
|
|
))
|
2021-12-06 12:06:10 +00:00
|
|
|
}
|
2022-05-14 05:01:35 +00:00
|
|
|
WaitError::Cancelled => None,
|
|
|
|
})?;
|
|
|
|
|
|
|
|
let completed_part = CompletedPart::builder()
|
|
|
|
.set_e_tag(output.e_tag)
|
|
|
|
.set_part_number(Some(part_number as i32))
|
|
|
|
.build();
|
|
|
|
state.completed_parts.push(completed_part);
|
2020-04-20 01:28:30 +00:00
|
|
|
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::info!(CAT, obj: element, "Uploaded part {}", part_number);
|
2019-06-12 08:07:39 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
fn create_upload_part_request(&self) -> Result<UploadPart, gst::ErrorMessage> {
|
2021-04-30 13:05:35 +00:00
|
|
|
let url = self.url.lock().unwrap();
|
2022-05-14 05:01:35 +00:00
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
};
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
let part_number = state.increment_part_number()?;
|
|
|
|
let body = Some(ByteStream::from(std::mem::replace(
|
|
|
|
&mut state.buffer,
|
|
|
|
Vec::with_capacity(settings.buffer_size as usize),
|
|
|
|
)));
|
|
|
|
|
|
|
|
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
|
|
|
|
let key = Some(url.as_ref().unwrap().object.to_owned());
|
|
|
|
let upload_id = Some(state.upload_id.to_owned());
|
|
|
|
|
|
|
|
let client = &state.client;
|
|
|
|
let upload_part = client
|
|
|
|
.upload_part()
|
|
|
|
.set_body(body)
|
|
|
|
.set_bucket(bucket)
|
|
|
|
.set_key(key)
|
|
|
|
.set_upload_id(upload_id)
|
|
|
|
.set_part_number(Some(part_number as i32));
|
|
|
|
|
|
|
|
Ok(upload_part)
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
fn create_complete_multipart_upload_request(
|
|
|
|
&self,
|
2022-05-14 05:01:35 +00:00
|
|
|
started_state: &mut Started,
|
|
|
|
) -> CompleteMultipartUpload {
|
|
|
|
started_state
|
|
|
|
.completed_parts
|
|
|
|
.sort_by(|a, b| a.part_number.cmp(&b.part_number));
|
|
|
|
|
|
|
|
let parts = Some(std::mem::take(&mut started_state.completed_parts));
|
|
|
|
|
|
|
|
let completed_upload = CompletedMultipartUpload::builder().set_parts(parts).build();
|
|
|
|
|
2021-04-30 13:05:35 +00:00
|
|
|
let url = self.url.lock().unwrap();
|
2022-05-14 05:01:35 +00:00
|
|
|
let client = &started_state.client;
|
|
|
|
|
|
|
|
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
|
|
|
|
let key = Some(url.as_ref().unwrap().object.to_owned());
|
|
|
|
let upload_id = Some(started_state.upload_id.to_owned());
|
|
|
|
let multipart_upload = Some(completed_upload);
|
|
|
|
|
|
|
|
client
|
|
|
|
.complete_multipart_upload()
|
|
|
|
.set_bucket(bucket)
|
|
|
|
.set_key(key)
|
|
|
|
.set_upload_id(upload_id)
|
|
|
|
.set_multipart_upload(multipart_upload)
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn create_create_multipart_upload_request(
|
|
|
|
&self,
|
2022-05-14 05:01:35 +00:00
|
|
|
client: &Client,
|
2021-04-30 13:05:35 +00:00
|
|
|
url: &GstS3Url,
|
2020-04-20 01:28:30 +00:00
|
|
|
settings: &Settings,
|
2022-05-14 05:01:35 +00:00
|
|
|
) -> CreateMultipartUpload {
|
|
|
|
let bucket = Some(url.bucket.clone());
|
|
|
|
let key = Some(url.object.clone());
|
|
|
|
let content_type = settings.content_type.clone();
|
|
|
|
let metadata = settings.to_metadata(&self.instance());
|
|
|
|
|
|
|
|
client
|
|
|
|
.create_multipart_upload()
|
|
|
|
.set_bucket(bucket)
|
|
|
|
.set_key(key)
|
|
|
|
.set_content_type(content_type)
|
|
|
|
.set_metadata(metadata)
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2021-12-06 12:06:10 +00:00
|
|
|
fn create_abort_multipart_upload_request(
|
|
|
|
&self,
|
2022-05-14 05:01:35 +00:00
|
|
|
client: &Client,
|
2021-12-06 12:06:10 +00:00
|
|
|
url: &GstS3Url,
|
|
|
|
started_state: &Started,
|
2022-05-14 05:01:35 +00:00
|
|
|
) -> AbortMultipartUpload {
|
|
|
|
let bucket = Some(url.bucket.clone());
|
|
|
|
let key = Some(url.object.clone());
|
|
|
|
|
|
|
|
client
|
|
|
|
.abort_multipart_upload()
|
|
|
|
.set_bucket(bucket)
|
|
|
|
.set_expected_bucket_owner(None)
|
|
|
|
.set_key(key)
|
|
|
|
.set_request_payer(None)
|
|
|
|
.set_upload_id(Some(started_state.upload_id.to_owned()))
|
2021-12-06 12:06:10 +00:00
|
|
|
}
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2021-12-06 12:06:10 +00:00
|
|
|
fn abort_multipart_upload_request(
|
|
|
|
&self,
|
|
|
|
started_state: &Started,
|
|
|
|
) -> Result<(), gst::ErrorMessage> {
|
2022-06-30 12:44:07 +00:00
|
|
|
let s3url = {
|
|
|
|
let url = self.url.lock().unwrap();
|
|
|
|
match *url {
|
|
|
|
Some(ref url) => url.clone(),
|
|
|
|
None => unreachable!("Element should be started"),
|
|
|
|
}
|
2020-04-20 01:28:30 +00:00
|
|
|
};
|
2022-03-18 09:44:17 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
let client = &started_state.client;
|
|
|
|
let abort_req = self.create_abort_multipart_upload_request(client, &s3url, started_state);
|
|
|
|
let abort_req_future = abort_req.send();
|
|
|
|
|
|
|
|
s3utils::wait(&self.abort_multipart_canceller, abort_req_future)
|
|
|
|
.map(|_| ())
|
|
|
|
.map_err(|err| match err {
|
|
|
|
WaitError::FutureError(err) => {
|
|
|
|
gst::error_msg!(
|
|
|
|
gst::ResourceError::Write,
|
|
|
|
["Failed to abort multipart upload: {}.", err.to_string()]
|
|
|
|
)
|
|
|
|
}
|
|
|
|
WaitError::Cancelled => {
|
|
|
|
gst::error_msg!(
|
|
|
|
gst::ResourceError::Write,
|
|
|
|
["Abort multipart upload request interrupted."]
|
|
|
|
)
|
|
|
|
}
|
|
|
|
})
|
2021-12-06 12:06:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn complete_multipart_upload_request(
|
|
|
|
&self,
|
|
|
|
started_state: &mut Started,
|
|
|
|
) -> Result<(), gst::ErrorMessage> {
|
2022-05-14 05:01:35 +00:00
|
|
|
let complete_req = self.create_complete_multipart_upload_request(started_state);
|
|
|
|
let complete_req_future = complete_req.send();
|
2022-03-18 09:44:17 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
s3utils::wait(&self.canceller, complete_req_future)
|
|
|
|
.map(|_| ())
|
|
|
|
.map_err(|err| match err {
|
|
|
|
WaitError::FutureError(err) => gst::error_msg!(
|
|
|
|
gst::ResourceError::Write,
|
|
|
|
["Failed to complete multipart upload: {}.", err.to_string()]
|
|
|
|
),
|
|
|
|
WaitError::Cancelled => {
|
|
|
|
gst::error_msg!(
|
|
|
|
gst::LibraryError::Failed,
|
|
|
|
["Complete multipart upload request interrupted"]
|
|
|
|
)
|
|
|
|
}
|
|
|
|
})
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2021-12-06 12:06:10 +00:00
|
|
|
fn finalize_upload(&self, element: &super::S3Sink) -> Result<(), gst::ErrorMessage> {
|
|
|
|
if self.flush_current_buffer(element).is_err() {
|
|
|
|
return Err(gst::error_msg!(
|
|
|
|
gst::ResourceError::Settings,
|
|
|
|
["Failed to flush internal buffer."]
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let started_state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
self.complete_multipart_upload_request(started_state)
|
|
|
|
}
|
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
if let State::Started { .. } = *state {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
|
2022-06-30 12:44:07 +00:00
|
|
|
let s3url = {
|
|
|
|
let url = self.url.lock().unwrap();
|
|
|
|
match *url {
|
|
|
|
Some(ref url) => url.clone(),
|
|
|
|
None => {
|
|
|
|
return Err(gst::error_msg!(
|
|
|
|
gst::ResourceError::Settings,
|
|
|
|
["Cannot start without a URL being set"]
|
|
|
|
));
|
|
|
|
}
|
2021-04-30 13:05:35 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
let timeout_config = s3utils::timeout_config(settings.request_timeout);
|
|
|
|
|
|
|
|
let cred = match (
|
2021-09-27 13:49:12 +00:00
|
|
|
settings.access_key.as_ref(),
|
|
|
|
settings.secret_access_key.as_ref(),
|
|
|
|
) {
|
2022-05-14 05:01:35 +00:00
|
|
|
(Some(access_key), Some(secret_access_key)) => Some(Credentials::new(
|
|
|
|
access_key.clone(),
|
|
|
|
secret_access_key.clone(),
|
2022-06-16 07:16:28 +00:00
|
|
|
settings.session_token.clone(),
|
2022-05-14 05:01:35 +00:00
|
|
|
None,
|
2022-05-26 09:52:42 +00:00
|
|
|
"aws-s3-sink",
|
2022-05-14 05:01:35 +00:00
|
|
|
)),
|
|
|
|
_ => None,
|
2021-09-27 13:49:12 +00:00
|
|
|
};
|
2020-04-20 01:28:30 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
let sdk_config =
|
|
|
|
s3utils::wait_config(&self.canceller, s3url.region.clone(), timeout_config, cred)
|
|
|
|
.map_err(|err| match err {
|
|
|
|
WaitError::FutureError(err) => gst::error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to create SDK config: {}", err]
|
|
|
|
),
|
|
|
|
WaitError::Cancelled => {
|
|
|
|
gst::error_msg!(
|
|
|
|
gst::LibraryError::Failed,
|
|
|
|
["SDK config request interrupted during start"]
|
|
|
|
)
|
|
|
|
}
|
|
|
|
})?;
|
2020-04-20 01:28:30 +00:00
|
|
|
|
2022-05-14 05:01:35 +00:00
|
|
|
let config = config::Builder::from(&sdk_config)
|
|
|
|
.retry_config(RetryConfig::new().with_max_attempts(settings.retry_attempts))
|
|
|
|
.build();
|
|
|
|
let client = Client::from_conf(config);
|
|
|
|
|
|
|
|
let create_multipart_req =
|
|
|
|
self.create_create_multipart_upload_request(&client, &s3url, &settings);
|
|
|
|
let create_multipart_req_future = create_multipart_req.send();
|
|
|
|
|
|
|
|
let response = s3utils::wait(&self.canceller, create_multipart_req_future).map_err(
|
|
|
|
|err| match err {
|
|
|
|
WaitError::FutureError(err) => gst::error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to create multipart upload: {}", err]
|
|
|
|
),
|
|
|
|
WaitError::Cancelled => {
|
|
|
|
gst::error_msg!(
|
|
|
|
gst::LibraryError::Failed,
|
|
|
|
["Create multipart request interrupted during start"]
|
|
|
|
)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
)?;
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2019-07-04 15:30:26 +00:00
|
|
|
let upload_id = response.upload_id.ok_or_else(|| {
|
2020-12-20 18:43:45 +00:00
|
|
|
gst::error_msg!(
|
2019-07-04 15:30:26 +00:00
|
|
|
gst::ResourceError::Failed,
|
|
|
|
["Failed to get multipart upload ID"]
|
|
|
|
)
|
|
|
|
})?;
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
*state = State::Started(Started::new(
|
|
|
|
client,
|
|
|
|
Vec::with_capacity(settings.buffer_size as usize),
|
2019-06-12 08:07:39 +00:00
|
|
|
upload_id,
|
2020-04-20 01:28:30 +00:00
|
|
|
));
|
|
|
|
|
|
|
|
Ok(())
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn update_buffer(
|
|
|
|
&self,
|
|
|
|
src: &[u8],
|
2020-11-14 17:24:01 +00:00
|
|
|
element: &super::S3Sink,
|
2019-06-12 08:07:39 +00:00
|
|
|
) -> Result<(), Option<gst::ErrorMessage>> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let started_state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
unreachable!("Element should be started already");
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let to_copy = std::cmp::min(
|
|
|
|
started_state.buffer.capacity() - started_state.buffer.len(),
|
|
|
|
src.len(),
|
|
|
|
);
|
|
|
|
|
|
|
|
let (head, tail) = src.split_at(to_copy);
|
|
|
|
started_state.buffer.extend_from_slice(head);
|
|
|
|
let do_flush = started_state.buffer.capacity() == started_state.buffer.len();
|
|
|
|
drop(state);
|
|
|
|
|
|
|
|
if do_flush {
|
|
|
|
self.flush_current_buffer(element)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if to_copy < src.len() {
|
|
|
|
self.update_buffer(tail, element)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn cancel(&self) {
|
|
|
|
let mut canceller = self.canceller.lock().unwrap();
|
2021-12-06 12:06:10 +00:00
|
|
|
let mut abort_canceller = self.abort_multipart_canceller.lock().unwrap();
|
|
|
|
|
|
|
|
if let Some(c) = abort_canceller.take() {
|
|
|
|
c.abort()
|
|
|
|
};
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2020-04-20 01:28:30 +00:00
|
|
|
if let Some(c) = canceller.take() {
|
|
|
|
c.abort()
|
|
|
|
};
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
2021-04-30 13:05:35 +00:00
|
|
|
|
2021-06-21 10:20:32 +00:00
|
|
|
fn set_uri(
|
|
|
|
self: &S3Sink,
|
|
|
|
object: &super::S3Sink,
|
|
|
|
url_str: Option<&str>,
|
|
|
|
) -> Result<(), glib::Error> {
|
2021-04-30 13:05:35 +00:00
|
|
|
let state = self.state.lock().unwrap();
|
|
|
|
|
|
|
|
if let State::Started { .. } = *state {
|
|
|
|
return Err(glib::Error::new(
|
|
|
|
gst::URIError::BadState,
|
|
|
|
"Cannot set URI on a started s3sink",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut url = self.url.lock().unwrap();
|
|
|
|
|
|
|
|
if url_str.is_none() {
|
|
|
|
*url = None;
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::debug!(CAT, obj: object, "Setting uri to {:?}", url_str);
|
2021-06-21 10:20:32 +00:00
|
|
|
|
2021-04-30 13:05:35 +00:00
|
|
|
let url_str = url_str.unwrap();
|
|
|
|
match parse_s3_url(url_str) {
|
|
|
|
Ok(s3url) => {
|
|
|
|
*url = Some(s3url);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
Err(_) => Err(glib::Error::new(
|
|
|
|
gst::URIError::BadUri,
|
|
|
|
"Could not parse URI",
|
|
|
|
)),
|
|
|
|
}
|
|
|
|
}
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2021-03-07 16:22:24 +00:00
|
|
|
#[glib::object_subclass]
|
2019-06-12 08:07:39 +00:00
|
|
|
impl ObjectSubclass for S3Sink {
|
2022-06-25 05:59:51 +00:00
|
|
|
const NAME: &'static str = "AwsS3Sink";
|
2020-11-14 17:24:01 +00:00
|
|
|
type Type = super::S3Sink;
|
2019-06-12 08:07:39 +00:00
|
|
|
type ParentType = gst_base::BaseSink;
|
2021-04-30 13:05:35 +00:00
|
|
|
type Interfaces = (gst::URIHandler,);
|
2021-01-21 18:21:29 +00:00
|
|
|
}
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2021-01-21 18:21:29 +00:00
|
|
|
impl ObjectImpl for S3Sink {
|
|
|
|
fn properties() -> &'static [glib::ParamSpec] {
|
|
|
|
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
|
|
|
vec![
|
2021-11-20 10:25:14 +00:00
|
|
|
glib::ParamSpecString::new(
|
2021-06-21 10:20:32 +00:00
|
|
|
"bucket",
|
|
|
|
"S3 Bucket",
|
|
|
|
"The bucket of the file to write",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2021-11-20 10:25:14 +00:00
|
|
|
glib::ParamSpecString::new(
|
2021-06-21 10:20:32 +00:00
|
|
|
"key",
|
|
|
|
"S3 Key",
|
|
|
|
"The key of the file to write",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2021-11-20 10:25:14 +00:00
|
|
|
glib::ParamSpecString::new(
|
2021-06-21 10:20:32 +00:00
|
|
|
"region",
|
|
|
|
"AWS Region",
|
|
|
|
"An AWS region (e.g. eu-west-2).",
|
2022-05-14 05:01:35 +00:00
|
|
|
Some("us-west-2"),
|
2021-06-21 10:20:32 +00:00
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2021-11-20 10:25:14 +00:00
|
|
|
glib::ParamSpecUInt64::new(
|
2021-01-21 18:21:29 +00:00
|
|
|
"part-size",
|
|
|
|
"Part size",
|
|
|
|
"A size (in bytes) of an individual part used for multipart upload.",
|
|
|
|
5 * 1024 * 1024, // 5 MB
|
|
|
|
5 * 1024 * 1024 * 1024, // 5 GB
|
|
|
|
DEFAULT_BUFFER_SIZE,
|
2021-01-31 12:44:45 +00:00
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
2021-01-21 18:21:29 +00:00
|
|
|
),
|
2021-11-20 10:25:14 +00:00
|
|
|
glib::ParamSpecString::new(
|
2021-04-30 13:05:35 +00:00
|
|
|
"uri",
|
|
|
|
"URI",
|
|
|
|
"The S3 object URI",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2021-11-20 10:25:14 +00:00
|
|
|
glib::ParamSpecString::new(
|
2021-09-27 13:49:12 +00:00
|
|
|
"access-key",
|
|
|
|
"Access Key",
|
|
|
|
"AWS Access Key",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2021-11-20 10:25:14 +00:00
|
|
|
glib::ParamSpecString::new(
|
2021-09-27 13:49:12 +00:00
|
|
|
"secret-access-key",
|
|
|
|
"Secret Access Key",
|
|
|
|
"AWS Secret Access Key",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2022-06-16 07:16:28 +00:00
|
|
|
glib::ParamSpecString::new(
|
|
|
|
"session-token",
|
|
|
|
"Session Token",
|
|
|
|
"AWS temporary Session Token from STS",
|
|
|
|
None,
|
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2021-11-22 09:12:51 +00:00
|
|
|
glib::ParamSpecBoxed::new(
|
|
|
|
"metadata",
|
|
|
|
"Metadata",
|
|
|
|
"A map of metadata to store with the object in S3; field values need to be convertible to strings.",
|
|
|
|
gst::Structure::static_type(),
|
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2021-12-06 12:06:10 +00:00
|
|
|
glib::ParamSpecEnum::new(
|
|
|
|
"on-error",
|
|
|
|
"Whether to upload or complete the multipart upload on error",
|
|
|
|
"Do nothing, abort or complete a multipart upload request on error",
|
|
|
|
OnError::static_type(),
|
|
|
|
DEFAULT_MULTIPART_UPLOAD_ON_ERROR as i32,
|
|
|
|
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
|
|
|
|
),
|
2022-05-14 05:01:35 +00:00
|
|
|
glib::ParamSpecUInt::new(
|
|
|
|
"retry-attempts",
|
|
|
|
"Retry attempts",
|
|
|
|
"Number of times AWS SDK attempts a request before abandoning the request",
|
|
|
|
1,
|
|
|
|
10,
|
|
|
|
DEFAULT_RETRY_ATTEMPTS,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
2022-03-18 10:10:18 +00:00
|
|
|
glib::ParamSpecInt64::new(
|
|
|
|
"request-timeout",
|
|
|
|
"Request timeout",
|
|
|
|
"Timeout for general S3 requests (in ms, set to -1 for infinity)",
|
|
|
|
-1,
|
|
|
|
std::i64::MAX,
|
|
|
|
DEFAULT_REQUEST_TIMEOUT_MSEC as i64,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
|
|
|
glib::ParamSpecInt64::new(
|
2022-05-14 05:01:35 +00:00
|
|
|
"upload-part-request-timeout",
|
|
|
|
"Upload part request timeout",
|
|
|
|
"Timeout for a single upload part request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.)",
|
2022-03-18 10:10:18 +00:00
|
|
|
-1,
|
|
|
|
std::i64::MAX,
|
2022-05-14 05:01:35 +00:00
|
|
|
DEFAULT_UPLOAD_PART_REQUEST_TIMEOUT_MSEC as i64,
|
2022-03-18 10:10:18 +00:00
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
2022-02-23 18:10:30 +00:00
|
|
|
glib::ParamSpecInt64::new(
|
2022-05-14 05:01:35 +00:00
|
|
|
"complete-upload-request-timeout",
|
|
|
|
"Complete upload request timeout",
|
|
|
|
"Timeout for the complete multipart upload request (in ms, set to -1 for infinity) (Deprecated. Use request-timeout.)",
|
2022-02-23 18:10:30 +00:00
|
|
|
-1,
|
|
|
|
std::i64::MAX,
|
2022-05-14 05:01:35 +00:00
|
|
|
DEFAULT_COMPLETE_REQUEST_TIMEOUT_MSEC as i64,
|
2022-02-23 18:10:30 +00:00
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
|
|
|
glib::ParamSpecInt64::new(
|
2022-05-14 05:01:35 +00:00
|
|
|
"retry-duration",
|
|
|
|
"Retry duration",
|
|
|
|
"How long we should retry general S3 requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)",
|
2022-02-23 18:10:30 +00:00
|
|
|
-1,
|
|
|
|
std::i64::MAX,
|
2022-05-14 05:01:35 +00:00
|
|
|
DEFAULT_RETRY_DURATION_MSEC as i64,
|
2022-02-23 18:10:30 +00:00
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
2022-03-18 10:10:18 +00:00
|
|
|
glib::ParamSpecInt64::new(
|
2022-05-14 05:01:35 +00:00
|
|
|
"upload-part-retry-duration",
|
|
|
|
"Upload part retry duration",
|
|
|
|
"How long we should retry upload part requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)",
|
2022-03-18 10:10:18 +00:00
|
|
|
-1,
|
|
|
|
std::i64::MAX,
|
2022-05-14 05:01:35 +00:00
|
|
|
DEFAULT_UPLOAD_PART_RETRY_DURATION_MSEC as i64,
|
2022-03-18 10:10:18 +00:00
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
|
|
|
glib::ParamSpecInt64::new(
|
|
|
|
"complete-upload-retry-duration",
|
|
|
|
"Complete upload retry duration",
|
2022-05-14 05:01:35 +00:00
|
|
|
"How long we should retry complete multipart upload requests before giving up (in ms, set to -1 for infinity) (Deprecated. Use retry-attempts.)",
|
2022-03-18 10:10:18 +00:00
|
|
|
-1,
|
|
|
|
std::i64::MAX,
|
|
|
|
DEFAULT_COMPLETE_RETRY_DURATION_MSEC as i64,
|
|
|
|
glib::ParamFlags::READWRITE,
|
|
|
|
),
|
2021-01-21 18:21:29 +00:00
|
|
|
]
|
|
|
|
});
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2021-01-21 18:21:29 +00:00
|
|
|
PROPERTIES.as_ref()
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2021-01-21 18:21:29 +00:00
|
|
|
fn set_property(
|
|
|
|
&self,
|
2021-04-30 13:05:35 +00:00
|
|
|
obj: &Self::Type,
|
2021-01-21 18:21:29 +00:00
|
|
|
_id: usize,
|
|
|
|
value: &glib::Value,
|
|
|
|
pspec: &glib::ParamSpec,
|
|
|
|
) {
|
2019-06-12 08:07:39 +00:00
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::debug!(
|
2021-11-22 10:28:46 +00:00
|
|
|
CAT,
|
|
|
|
obj: obj,
|
|
|
|
"Setting property '{}' to '{:?}'",
|
|
|
|
pspec.name(),
|
|
|
|
value
|
|
|
|
);
|
|
|
|
|
2021-04-12 12:49:54 +00:00
|
|
|
match pspec.name() {
|
2021-06-21 10:20:32 +00:00
|
|
|
"bucket" => {
|
|
|
|
settings.bucket = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
if settings.key.is_some() {
|
|
|
|
let _ = self.set_uri(obj, Some(&settings.to_uri()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"key" => {
|
|
|
|
settings.key = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
if settings.bucket.is_some() {
|
|
|
|
let _ = self.set_uri(obj, Some(&settings.to_uri()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"region" => {
|
2021-09-27 16:16:26 +00:00
|
|
|
let region = value.get::<String>().expect("type checked upstream");
|
2022-05-14 05:01:35 +00:00
|
|
|
settings.region = Region::new(region);
|
2021-06-21 10:20:32 +00:00
|
|
|
if settings.key.is_some() && settings.bucket.is_some() {
|
|
|
|
let _ = self.set_uri(obj, Some(&settings.to_uri()));
|
|
|
|
}
|
|
|
|
}
|
2021-01-21 18:21:29 +00:00
|
|
|
"part-size" => {
|
2021-04-25 12:41:22 +00:00
|
|
|
settings.buffer_size = value.get::<u64>().expect("type checked upstream");
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
2021-04-30 13:05:35 +00:00
|
|
|
"uri" => {
|
|
|
|
let _ = self.set_uri(obj, value.get().expect("type checked upstream"));
|
|
|
|
}
|
2021-09-27 13:49:12 +00:00
|
|
|
"access-key" => {
|
|
|
|
settings.access_key = value.get().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"secret-access-key" => {
|
|
|
|
settings.secret_access_key = value.get().expect("type checked upstream");
|
|
|
|
}
|
2022-06-16 07:16:28 +00:00
|
|
|
"session-token" => {
|
|
|
|
settings.session_token = value.get().expect("type checked upstream");
|
|
|
|
}
|
2021-11-22 09:12:51 +00:00
|
|
|
"metadata" => {
|
|
|
|
settings.metadata = value.get().expect("type checked upstream");
|
|
|
|
}
|
2021-12-06 12:06:10 +00:00
|
|
|
"on-error" => {
|
|
|
|
settings.multipart_upload_on_error =
|
|
|
|
value.get::<OnError>().expect("type checked upstream");
|
|
|
|
}
|
2022-05-14 05:01:35 +00:00
|
|
|
"retry-attempts" => {
|
|
|
|
settings.retry_attempts = value.get::<u32>().expect("type checked upstream");
|
|
|
|
}
|
2022-03-18 10:10:18 +00:00
|
|
|
"request-timeout" => {
|
|
|
|
settings.request_timeout =
|
|
|
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
|
|
|
}
|
2022-02-23 18:10:30 +00:00
|
|
|
"upload-part-request-timeout" => {
|
2022-05-14 05:01:35 +00:00
|
|
|
settings.request_timeout =
|
2022-03-18 10:10:18 +00:00
|
|
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
|
|
|
}
|
|
|
|
"complete-upload-request-timeout" => {
|
2022-05-14 05:01:35 +00:00
|
|
|
settings.request_timeout =
|
2022-03-18 10:10:18 +00:00
|
|
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
|
|
|
}
|
2022-06-06 13:27:50 +00:00
|
|
|
"retry-duration" => {
|
2022-05-14 05:01:35 +00:00
|
|
|
/*
|
|
|
|
* To maintain backwards compatibility calculate retry attempts
|
|
|
|
* by dividing the provided duration from request timeout.
|
|
|
|
*/
|
|
|
|
let value = value.get::<i64>().expect("type checked upstream");
|
|
|
|
let request_timeout = duration_to_millis(Some(settings.request_timeout));
|
|
|
|
let retry_attempts = if value > request_timeout {
|
|
|
|
value / request_timeout
|
|
|
|
} else {
|
|
|
|
request_timeout / value
|
|
|
|
};
|
|
|
|
settings.retry_attempts = retry_attempts as u32;
|
2022-02-23 18:10:30 +00:00
|
|
|
}
|
2022-06-06 13:27:50 +00:00
|
|
|
"upload-part-retry-duration" | "complete-upload-retry-duration" => {
|
|
|
|
gst::warning!(CAT, "Use retry-attempts. retry/upload-part/complete-upload-retry duration are deprecated.");
|
|
|
|
}
|
2019-06-12 08:07:39 +00:00
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-20 12:57:40 +00:00
|
|
|
fn property(&self, _: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
2019-06-12 08:07:39 +00:00
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
2021-04-12 12:49:54 +00:00
|
|
|
match pspec.name() {
|
2021-06-21 10:20:32 +00:00
|
|
|
"key" => settings.key.to_value(),
|
|
|
|
"bucket" => settings.bucket.to_value(),
|
2022-05-14 05:01:35 +00:00
|
|
|
"region" => settings.region.to_string().to_value(),
|
2021-01-21 18:21:29 +00:00
|
|
|
"part-size" => settings.buffer_size.to_value(),
|
2021-04-30 13:05:35 +00:00
|
|
|
"uri" => {
|
2022-06-30 12:44:07 +00:00
|
|
|
let url = self.url.lock().unwrap();
|
|
|
|
let url = match *url {
|
2021-04-30 13:05:35 +00:00
|
|
|
Some(ref url) => url.to_string(),
|
|
|
|
None => "".to_string(),
|
|
|
|
};
|
|
|
|
|
|
|
|
url.to_value()
|
|
|
|
}
|
2021-09-27 13:49:12 +00:00
|
|
|
"access-key" => settings.access_key.to_value(),
|
|
|
|
"secret-access-key" => settings.secret_access_key.to_value(),
|
2022-06-16 07:16:28 +00:00
|
|
|
"session-token" => settings.session_token.to_value(),
|
2021-11-22 09:12:51 +00:00
|
|
|
"metadata" => settings.metadata.to_value(),
|
2021-12-06 12:06:10 +00:00
|
|
|
"on-error" => settings.multipart_upload_on_error.to_value(),
|
2022-05-14 05:01:35 +00:00
|
|
|
"retry-attempts" => settings.retry_attempts.to_value(),
|
|
|
|
"request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(),
|
2022-02-23 18:10:30 +00:00
|
|
|
"upload-part-request-timeout" => {
|
2022-05-14 05:01:35 +00:00
|
|
|
duration_to_millis(Some(settings.request_timeout)).to_value()
|
2022-03-18 10:10:18 +00:00
|
|
|
}
|
|
|
|
"complete-upload-request-timeout" => {
|
2022-05-14 05:01:35 +00:00
|
|
|
duration_to_millis(Some(settings.request_timeout)).to_value()
|
2022-03-18 10:10:18 +00:00
|
|
|
}
|
2022-05-14 05:01:35 +00:00
|
|
|
"retry-duration" | "upload-part-retry-duration" | "complete-upload-retry-duration" => {
|
|
|
|
let request_timeout = duration_to_millis(Some(settings.request_timeout));
|
|
|
|
(settings.retry_attempts as i64 * request_timeout).to_value()
|
2022-02-23 18:10:30 +00:00
|
|
|
}
|
2019-06-12 08:07:39 +00:00
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-10-23 08:57:31 +00:00
|
|
|
impl GstObjectImpl for S3Sink {}
|
|
|
|
|
2021-01-21 18:21:29 +00:00
|
|
|
impl ElementImpl for S3Sink {
|
|
|
|
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
|
|
|
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
|
|
|
gst::subclass::ElementMetadata::new(
|
|
|
|
"Amazon S3 sink",
|
|
|
|
"Source/Network",
|
|
|
|
"Writes an object to Amazon S3",
|
|
|
|
"Marcin Kolny <mkolny@amazon.com>",
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
Some(&*ELEMENT_METADATA)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn pad_templates() -> &'static [gst::PadTemplate] {
|
|
|
|
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
|
|
|
let caps = gst::Caps::new_any();
|
|
|
|
let sink_pad_template = gst::PadTemplate::new(
|
|
|
|
"sink",
|
|
|
|
gst::PadDirection::Sink,
|
|
|
|
gst::PadPresence::Always,
|
|
|
|
&caps,
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
vec![sink_pad_template]
|
|
|
|
});
|
|
|
|
|
|
|
|
PAD_TEMPLATES.as_ref()
|
|
|
|
}
|
|
|
|
}
|
2019-06-12 08:07:39 +00:00
|
|
|
|
2021-04-30 13:05:35 +00:00
|
|
|
impl URIHandlerImpl for S3Sink {
|
|
|
|
const URI_TYPE: gst::URIType = gst::URIType::Sink;
|
|
|
|
|
|
|
|
fn protocols() -> &'static [&'static str] {
|
|
|
|
&["s3"]
|
|
|
|
}
|
|
|
|
|
|
|
|
fn uri(&self, _: &Self::Type) -> Option<String> {
|
|
|
|
self.url.lock().unwrap().as_ref().map(|s| s.to_string())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_uri(&self, element: &Self::Type, uri: &str) -> Result<(), glib::Error> {
|
|
|
|
self.set_uri(element, Some(uri))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-06-12 08:07:39 +00:00
|
|
|
impl BaseSinkImpl for S3Sink {
|
2020-11-14 17:24:01 +00:00
|
|
|
fn start(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
|
2020-04-20 01:28:30 +00:00
|
|
|
self.start()
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
|
2020-11-14 17:24:01 +00:00
|
|
|
fn stop(&self, element: &Self::Type) -> Result<(), gst::ErrorMessage> {
|
2019-06-12 08:07:39 +00:00
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
*state = State::Stopped;
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::info!(CAT, obj: element, "Stopped");
|
2019-06-12 08:07:39 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn render(
|
|
|
|
&self,
|
2020-11-14 17:24:01 +00:00
|
|
|
element: &Self::Type,
|
2019-06-12 08:07:39 +00:00
|
|
|
buffer: &gst::Buffer,
|
|
|
|
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
|
|
|
if let State::Stopped = *self.state.lock().unwrap() {
|
2020-12-20 18:43:45 +00:00
|
|
|
gst::element_error!(element, gst::CoreError::Failed, ["Not started yet"]);
|
2019-06-12 08:07:39 +00:00
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::trace!(CAT, obj: element, "Rendering {:?}", buffer);
|
2019-12-18 05:50:10 +00:00
|
|
|
let map = buffer.map_readable().map_err(|_| {
|
2020-12-20 18:43:45 +00:00
|
|
|
gst::element_error!(element, gst::CoreError::Failed, ["Failed to map buffer"]);
|
2019-06-12 08:07:39 +00:00
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
|
|
|
|
match self.update_buffer(&map, element) {
|
|
|
|
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
|
|
|
Err(err) => match err {
|
|
|
|
Some(error_message) => {
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::error!(
|
2019-10-31 22:34:21 +00:00
|
|
|
CAT,
|
2019-06-12 08:07:39 +00:00
|
|
|
obj: element,
|
|
|
|
"Multipart upload failed: {}",
|
|
|
|
error_message
|
|
|
|
);
|
2020-06-30 20:57:22 +00:00
|
|
|
element.post_error_message(error_message);
|
2019-06-12 08:07:39 +00:00
|
|
|
Err(gst::FlowError::Error)
|
|
|
|
}
|
|
|
|
_ => {
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::info!(CAT, obj: element, "Upload interrupted. Flushing...");
|
2019-06-12 08:07:39 +00:00
|
|
|
Err(gst::FlowError::Flushing)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-14 17:24:01 +00:00
|
|
|
fn unlock(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> {
|
2019-06-12 08:07:39 +00:00
|
|
|
self.cancel();
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2020-11-14 17:24:01 +00:00
|
|
|
fn event(&self, element: &Self::Type, event: gst::Event) -> bool {
|
2019-11-24 22:00:27 +00:00
|
|
|
if let gst::EventView::Eos(_) = event.view() {
|
|
|
|
if let Err(error_message) = self.finalize_upload(element) {
|
2022-02-21 17:43:46 +00:00
|
|
|
gst::error!(
|
2019-11-24 22:00:27 +00:00
|
|
|
CAT,
|
|
|
|
obj: element,
|
|
|
|
"Failed to finalize the upload: {}",
|
|
|
|
error_message
|
|
|
|
);
|
|
|
|
return false;
|
2019-06-12 08:07:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
BaseSinkImplExt::parent_event(self, element, event)
|
|
|
|
}
|
|
|
|
}
|