2023-09-27 17:08:10 +00:00
|
|
|
// Copyright (C) 2019 Amazon.com, Inc. or its affiliates <mkolny@amazon.com>
|
|
|
|
// Copyright (C) 2023 Asymptotic Inc
|
|
|
|
// Author: Arun Raghavan <arun@asymptotic.io>
|
|
|
|
//
|
|
|
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
|
|
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
|
|
|
// <https://mozilla.org/MPL/2.0/>.
|
|
|
|
//
|
|
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
|
|
|
|
use gst::glib;
|
|
|
|
use gst::prelude::*;
|
|
|
|
use gst::subclass::prelude::*;
|
|
|
|
use gst_base::subclass::prelude::*;
|
|
|
|
|
|
|
|
use aws_sdk_s3::{
|
|
|
|
config::{self, retry::RetryConfig, Credentials, Region},
|
|
|
|
operation::put_object::builders::PutObjectFluentBuilder,
|
|
|
|
primitives::ByteStream,
|
|
|
|
Client,
|
|
|
|
};
|
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
use super::NextFile;
|
2024-01-31 15:07:56 +00:00
|
|
|
use once_cell::sync::Lazy;
|
2023-09-27 17:08:10 +00:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::convert::From;
|
|
|
|
use std::sync::Mutex;
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
use crate::s3url::*;
|
|
|
|
use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
|
|
|
|
|
|
|
|
const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
|
2023-09-28 15:36:04 +00:00
|
|
|
const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1;
|
|
|
|
const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
|
|
|
|
const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
|
2023-09-28 16:38:19 +00:00
|
|
|
const DEFAULT_FLUSH_ON_ERROR: bool = false;
|
2024-04-05 08:13:10 +00:00
|
|
|
const DEFAULT_FORCE_PATH_STYLE: bool = false;
|
2024-04-17 16:11:06 +00:00
|
|
|
const DEFAULT_NEXT_FILE: NextFile = NextFile::Buffer;
|
|
|
|
const DEFAULT_MIN_KEYFRAME_DISTANCE: gst::ClockTime = gst::ClockTime::from_seconds(10);
|
2023-09-27 17:08:10 +00:00
|
|
|
|
|
|
|
// General setting for create / abort requests
|
|
|
|
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
|
|
|
|
|
|
|
|
struct Started {
|
|
|
|
client: Client,
|
|
|
|
buffer: Vec<u8>,
|
2023-09-28 15:36:04 +00:00
|
|
|
start_pts: Option<gst::ClockTime>,
|
|
|
|
num_buffers: u64,
|
2023-09-28 16:38:19 +00:00
|
|
|
need_flush: bool,
|
2024-04-17 16:11:06 +00:00
|
|
|
index: u64,
|
|
|
|
next_segment: Option<gst::ClockTime>,
|
|
|
|
streamheaders: Option<Vec<u8>>,
|
|
|
|
streamheaders_size: u64,
|
|
|
|
file_start_pts: Option<gst::ClockTime>,
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Started {
|
|
|
|
pub fn new(client: Client, buffer: Vec<u8>) -> Started {
|
2023-09-28 15:36:04 +00:00
|
|
|
Started {
|
|
|
|
client,
|
|
|
|
buffer,
|
|
|
|
start_pts: gst::ClockTime::NONE,
|
|
|
|
num_buffers: 0,
|
2023-09-28 16:38:19 +00:00
|
|
|
need_flush: false,
|
2024-04-17 16:11:06 +00:00
|
|
|
index: 0,
|
|
|
|
next_segment: gst::ClockTime::NONE,
|
|
|
|
streamheaders: None,
|
|
|
|
streamheaders_size: 0,
|
|
|
|
file_start_pts: gst::ClockTime::NONE,
|
2023-09-28 15:36:04 +00:00
|
|
|
}
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
enum State {
|
|
|
|
#[default]
|
|
|
|
Stopped,
|
|
|
|
Started(Started),
|
|
|
|
}
|
|
|
|
|
|
|
|
struct Settings {
|
|
|
|
region: Region,
|
|
|
|
bucket: Option<String>,
|
|
|
|
key: Option<String>,
|
2024-05-24 10:11:19 +00:00
|
|
|
cache_control: Option<String>,
|
2023-09-27 17:08:10 +00:00
|
|
|
content_type: Option<String>,
|
|
|
|
content_disposition: Option<String>,
|
2024-05-24 10:11:19 +00:00
|
|
|
content_encoding: Option<String>,
|
|
|
|
content_language: Option<String>,
|
2023-09-27 17:08:10 +00:00
|
|
|
access_key: Option<String>,
|
|
|
|
secret_access_key: Option<String>,
|
|
|
|
session_token: Option<String>,
|
|
|
|
metadata: Option<gst::Structure>,
|
|
|
|
retry_attempts: u32,
|
|
|
|
request_timeout: Duration,
|
|
|
|
endpoint_uri: Option<String>,
|
2024-04-05 08:13:10 +00:00
|
|
|
force_path_style: bool,
|
2023-09-28 15:36:04 +00:00
|
|
|
flush_interval_buffers: u64,
|
|
|
|
flush_interval_bytes: u64,
|
|
|
|
flush_interval_time: Option<gst::ClockTime>,
|
2023-09-28 16:38:19 +00:00
|
|
|
flush_on_error: bool,
|
2024-04-17 16:11:06 +00:00
|
|
|
next_file: NextFile,
|
|
|
|
min_keyframe_distance: gst::ClockTime,
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Settings {
|
|
|
|
fn to_uri(&self) -> String {
|
|
|
|
GstS3Url {
|
|
|
|
region: self.region.clone(),
|
|
|
|
bucket: self.bucket.clone().unwrap(),
|
|
|
|
object: self.key.clone().unwrap(),
|
|
|
|
version: None,
|
|
|
|
}
|
|
|
|
.to_string()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn to_metadata(&self, imp: &S3PutObjectSink) -> Option<HashMap<String, String>> {
|
|
|
|
self.metadata.as_ref().map(|structure| {
|
|
|
|
let mut hash = HashMap::new();
|
|
|
|
|
|
|
|
for (key, value) in structure.iter() {
|
|
|
|
if let Ok(Ok(value_str)) = value.transform::<String>().map(|v| v.get()) {
|
2024-07-08 08:03:11 +00:00
|
|
|
gst::log!(CAT, imp = imp, "metadata '{}' -> '{}'", key, value_str);
|
2023-09-27 17:08:10 +00:00
|
|
|
hash.insert(key.to_string(), value_str);
|
|
|
|
} else {
|
|
|
|
gst::warning!(
|
|
|
|
CAT,
|
2024-07-08 08:03:11 +00:00
|
|
|
imp = imp,
|
2023-09-27 17:08:10 +00:00
|
|
|
"Failed to convert metadata '{}' to string ('{:?}')",
|
|
|
|
key,
|
|
|
|
value
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
hash
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for Settings {
|
|
|
|
fn default() -> Self {
|
|
|
|
Settings {
|
|
|
|
region: Region::new("us-west-2"),
|
|
|
|
bucket: None,
|
|
|
|
key: None,
|
2024-05-24 10:11:19 +00:00
|
|
|
cache_control: None,
|
2023-09-27 17:08:10 +00:00
|
|
|
content_type: None,
|
|
|
|
content_disposition: None,
|
2024-05-24 10:11:19 +00:00
|
|
|
content_encoding: None,
|
|
|
|
content_language: None,
|
2023-09-27 17:08:10 +00:00
|
|
|
access_key: None,
|
|
|
|
secret_access_key: None,
|
|
|
|
session_token: None,
|
|
|
|
metadata: None,
|
|
|
|
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
|
|
|
|
request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
|
|
|
|
endpoint_uri: None,
|
2024-04-05 08:13:10 +00:00
|
|
|
force_path_style: DEFAULT_FORCE_PATH_STYLE,
|
2023-09-28 15:36:04 +00:00
|
|
|
flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS,
|
|
|
|
flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
|
|
|
|
flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
|
2023-09-28 16:38:19 +00:00
|
|
|
flush_on_error: DEFAULT_FLUSH_ON_ERROR,
|
2024-04-17 16:11:06 +00:00
|
|
|
next_file: DEFAULT_NEXT_FILE,
|
|
|
|
min_keyframe_distance: DEFAULT_MIN_KEYFRAME_DISTANCE,
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Default)]
|
|
|
|
pub struct S3PutObjectSink {
|
|
|
|
url: Mutex<Option<GstS3Url>>,
|
|
|
|
settings: Mutex<Settings>,
|
|
|
|
state: Mutex<State>,
|
2024-06-07 15:55:43 +00:00
|
|
|
canceller: Mutex<s3utils::Canceller>,
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
|
|
|
gst::DebugCategory::new(
|
|
|
|
"awss3putobjectsink",
|
|
|
|
gst::DebugColorFlags::empty(),
|
|
|
|
Some("Amazon S3 PutObject Sink"),
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
impl S3PutObjectSink {
|
2024-04-17 16:11:06 +00:00
|
|
|
fn check_thresholds(&self, settings: &Settings, state: &Started, buffer: &gst::Buffer) -> bool {
|
|
|
|
let pts = buffer.pts();
|
|
|
|
let duration = buffer.duration();
|
2023-09-28 15:36:04 +00:00
|
|
|
|
|
|
|
#[allow(clippy::if_same_then_else)]
|
|
|
|
#[allow(clippy::needless_bool)]
|
|
|
|
// Verbose if/else form for readability
|
|
|
|
if settings.flush_interval_buffers > 0
|
|
|
|
&& (state.num_buffers % settings.flush_interval_buffers) == 0
|
|
|
|
{
|
|
|
|
true
|
|
|
|
} else if settings.flush_interval_bytes > 0
|
|
|
|
&& (state.buffer.len() as u64 % settings.flush_interval_bytes) == 0
|
|
|
|
{
|
|
|
|
true
|
|
|
|
} else if settings.flush_interval_time.is_some()
|
|
|
|
&& settings.flush_interval_time.unwrap() != DEFAULT_FLUSH_INTERVAL_TIME
|
|
|
|
&& state.start_pts.is_some()
|
|
|
|
&& pts.is_some()
|
|
|
|
&& duration.is_some()
|
|
|
|
&& (pts.unwrap() - state.start_pts.unwrap() + duration.unwrap())
|
|
|
|
% settings.flush_interval_time.unwrap()
|
|
|
|
== gst::ClockTime::from_nseconds(0)
|
|
|
|
{
|
|
|
|
true
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-09-27 17:08:10 +00:00
|
|
|
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
if let State::Started { .. } = *state {
|
|
|
|
unreachable!("Element should be started");
|
|
|
|
}
|
|
|
|
|
|
|
|
let s3url = {
|
|
|
|
let url = self.url.lock().unwrap();
|
|
|
|
match *url {
|
|
|
|
Some(ref url) => url.clone(),
|
|
|
|
None => {
|
|
|
|
return Err(gst::error_msg!(
|
|
|
|
gst::ResourceError::Settings,
|
|
|
|
["Cannot start without a URL being set"]
|
|
|
|
));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let timeout_config = s3utils::timeout_config(settings.request_timeout);
|
|
|
|
|
|
|
|
let cred = match (
|
|
|
|
settings.access_key.as_ref(),
|
|
|
|
settings.secret_access_key.as_ref(),
|
|
|
|
) {
|
|
|
|
(Some(access_key), Some(secret_access_key)) => Some(Credentials::new(
|
|
|
|
access_key.clone(),
|
|
|
|
secret_access_key.clone(),
|
|
|
|
settings.session_token.clone(),
|
|
|
|
None,
|
|
|
|
"aws-s3-putobject-sink",
|
|
|
|
)),
|
|
|
|
_ => None,
|
|
|
|
};
|
|
|
|
|
|
|
|
let sdk_config =
|
|
|
|
s3utils::wait_config(&self.canceller, s3url.region.clone(), timeout_config, cred)
|
|
|
|
.map_err(|err| match err {
|
|
|
|
WaitError::FutureError(err) => gst::error_msg!(
|
|
|
|
gst::ResourceError::OpenWrite,
|
|
|
|
["Failed to create SDK config: {}", err]
|
|
|
|
),
|
|
|
|
WaitError::Cancelled => {
|
|
|
|
gst::error_msg!(
|
|
|
|
gst::LibraryError::Failed,
|
|
|
|
["SDK config request interrupted during start"]
|
|
|
|
)
|
|
|
|
}
|
|
|
|
})?;
|
|
|
|
|
|
|
|
let config_builder = config::Builder::from(&sdk_config)
|
2024-04-05 08:13:10 +00:00
|
|
|
.force_path_style(settings.force_path_style)
|
2023-09-27 17:08:10 +00:00
|
|
|
.retry_config(RetryConfig::standard().with_max_attempts(settings.retry_attempts));
|
|
|
|
|
|
|
|
let config = if let Some(ref uri) = settings.endpoint_uri {
|
|
|
|
config_builder.endpoint_url(uri).build()
|
|
|
|
} else {
|
|
|
|
config_builder.build()
|
|
|
|
};
|
|
|
|
|
|
|
|
let client = Client::from_conf(config);
|
|
|
|
|
|
|
|
*state = State::Started(Started::new(client, Vec::new()));
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_uri(self: &S3PutObjectSink, url_str: Option<&str>) -> Result<(), glib::Error> {
|
|
|
|
let state = self.state.lock().unwrap();
|
|
|
|
|
|
|
|
if let State::Started { .. } = *state {
|
|
|
|
return Err(glib::Error::new(
|
|
|
|
gst::URIError::BadState,
|
|
|
|
"Cannot set URI on a started s3sink",
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut url = self.url.lock().unwrap();
|
|
|
|
|
|
|
|
if url_str.is_none() {
|
|
|
|
*url = None;
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
2024-07-08 08:03:11 +00:00
|
|
|
gst::debug!(CAT, imp = self, "Setting uri to {:?}", url_str);
|
2023-09-27 17:08:10 +00:00
|
|
|
|
|
|
|
let url_str = url_str.unwrap();
|
|
|
|
match parse_s3_url(url_str) {
|
|
|
|
Ok(s3url) => {
|
|
|
|
*url = Some(s3url);
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
Err(_) => Err(glib::Error::new(
|
|
|
|
gst::URIError::BadUri,
|
|
|
|
"Could not parse URI",
|
|
|
|
)),
|
|
|
|
}
|
|
|
|
}
|
2024-04-17 16:11:06 +00:00
|
|
|
|
|
|
|
fn accumulate_buffer(
|
|
|
|
&self,
|
|
|
|
buffer: &gst::Buffer,
|
|
|
|
started_state: &mut Started,
|
|
|
|
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
|
|
|
if started_state.start_pts.is_none() {
|
|
|
|
started_state.start_pts = buffer.pts();
|
|
|
|
}
|
|
|
|
|
|
|
|
started_state.num_buffers += 1;
|
|
|
|
started_state.need_flush = true;
|
|
|
|
|
|
|
|
gst::trace!(CAT, imp = self, "Rendering {:?}", buffer);
|
|
|
|
let map = buffer.map_readable().map_err(|_| {
|
|
|
|
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
|
|
|
|
started_state.buffer.extend_from_slice(map.as_slice());
|
|
|
|
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn create_body_with_streamheaders(
|
|
|
|
&self,
|
|
|
|
next_file: NextFile,
|
|
|
|
streamheaders: &Option<Vec<u8>>,
|
|
|
|
buffer: &[u8],
|
|
|
|
) -> ByteStream {
|
|
|
|
match next_file {
|
|
|
|
NextFile::KeyFrame | NextFile::MaxSize | NextFile::MaxDuration => {
|
|
|
|
if let Some(headers) = streamheaders {
|
|
|
|
let with_sh = [&headers[..], buffer].concat();
|
|
|
|
|
|
|
|
ByteStream::from(with_sh)
|
|
|
|
} else {
|
|
|
|
ByteStream::from(buffer.to_vec())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => ByteStream::from(buffer.to_vec()),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn create_put_object_request(
|
|
|
|
&self,
|
|
|
|
started_state: &mut Started,
|
|
|
|
) -> Result<Option<PutObjectFluentBuilder>, gst::FlowError> {
|
|
|
|
let url = self.url.lock().unwrap();
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
if started_state.buffer.is_empty() {
|
|
|
|
return Ok(None);
|
|
|
|
}
|
|
|
|
|
|
|
|
let body = Some(self.create_body_with_streamheaders(
|
|
|
|
settings.next_file,
|
|
|
|
&started_state.streamheaders,
|
|
|
|
&started_state.buffer,
|
|
|
|
));
|
|
|
|
|
|
|
|
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
|
|
|
|
|
|
|
|
let object = url.as_ref().unwrap().object.to_owned();
|
|
|
|
let key = if object.contains("%0") {
|
|
|
|
match sprintf::sprintf!(&object, started_state.index) {
|
|
|
|
Ok(k) => {
|
|
|
|
/* Equivalent to opening a new file */
|
|
|
|
started_state.index += 1;
|
|
|
|
started_state.buffer = Vec::new();
|
|
|
|
|
|
|
|
Some(k)
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
gst::element_imp_error!(
|
|
|
|
self,
|
|
|
|
gst::CoreError::Failed,
|
|
|
|
["Failed to format file name: {}", e]
|
|
|
|
);
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
Some(object)
|
|
|
|
};
|
|
|
|
let metadata = settings.to_metadata(self);
|
|
|
|
let client = &started_state.client;
|
|
|
|
|
|
|
|
Ok(Some(
|
|
|
|
client
|
|
|
|
.put_object()
|
|
|
|
.set_body(body)
|
|
|
|
.set_bucket(bucket)
|
|
|
|
.set_key(key)
|
|
|
|
.set_metadata(metadata),
|
|
|
|
))
|
|
|
|
}
|
|
|
|
|
|
|
|
fn to_write_next_file(
|
|
|
|
&self,
|
|
|
|
started_state: &mut Started,
|
|
|
|
buffer: &gst::Buffer,
|
|
|
|
buffer_size: u64,
|
|
|
|
) -> bool {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
let next_file = settings.next_file;
|
|
|
|
let max_file_size = settings.flush_interval_bytes;
|
|
|
|
let max_file_duration = settings.flush_interval_time;
|
|
|
|
let min_keyframe_distance = settings.min_keyframe_distance;
|
|
|
|
|
|
|
|
match next_file {
|
|
|
|
NextFile::Buffer => self.check_thresholds(&settings, started_state, buffer),
|
|
|
|
NextFile::MaxSize => {
|
|
|
|
started_state.buffer.len() as u64 + started_state.streamheaders_size + buffer_size
|
|
|
|
> max_file_size
|
|
|
|
}
|
|
|
|
NextFile::MaxDuration => {
|
|
|
|
let mut new_duration = gst::ClockTime::ZERO;
|
|
|
|
|
|
|
|
if buffer.pts().is_some() && started_state.file_start_pts.is_some() {
|
|
|
|
new_duration = buffer.pts().unwrap() - started_state.file_start_pts.unwrap();
|
|
|
|
|
|
|
|
if buffer.duration().is_some() {
|
|
|
|
new_duration += buffer.duration().unwrap();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
started_state.file_start_pts = match buffer.pts() {
|
|
|
|
Some(pts) => Some(pts),
|
|
|
|
None => started_state.file_start_pts,
|
|
|
|
};
|
|
|
|
|
|
|
|
new_duration > max_file_duration.unwrap()
|
|
|
|
}
|
|
|
|
NextFile::KeyFrame => {
|
|
|
|
if started_state.next_segment == gst::ClockTime::NONE && buffer.pts().is_some() {
|
|
|
|
started_state.next_segment =
|
|
|
|
Some(buffer.pts().unwrap() + min_keyframe_distance);
|
|
|
|
}
|
|
|
|
|
|
|
|
if buffer.pts().is_some() {
|
|
|
|
let buffer_ts = buffer.pts().unwrap();
|
|
|
|
let delta_unit = buffer.flags().contains(gst::BufferFlags::DELTA_UNIT);
|
|
|
|
let next_segment = started_state
|
|
|
|
.next_segment
|
|
|
|
.expect("Next segment must be valid here");
|
|
|
|
|
|
|
|
if buffer_ts >= next_segment && !delta_unit {
|
|
|
|
started_state.next_segment = Some(next_segment + min_keyframe_distance);
|
|
|
|
|
|
|
|
true
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
NextFile::Discont => buffer.flags().contains(gst::BufferFlags::DISCONT),
|
|
|
|
NextFile::KeyUnitEvent => false, // Next file will be opened on KeyUnitEvent
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn write_put_object_request(
|
|
|
|
&self,
|
|
|
|
started_state: &mut Started,
|
|
|
|
) -> Result<gst::FlowSuccess, gst::FlowError> {
|
|
|
|
let req = self.create_put_object_request(started_state)?;
|
|
|
|
|
|
|
|
if let Some(put_object_req) = req {
|
|
|
|
let put_object_req_future = put_object_req.send();
|
|
|
|
|
|
|
|
match s3utils::wait(&self.canceller, put_object_req_future) {
|
|
|
|
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
|
|
|
Err(err) => match err {
|
|
|
|
WaitError::Cancelled => Ok(gst::FlowSuccess::Ok),
|
|
|
|
WaitError::FutureError(e) => {
|
|
|
|
gst::element_imp_error!(self, gst::CoreError::Failed, ["{e}"]);
|
|
|
|
Err(gst::FlowError::Error)
|
|
|
|
}
|
|
|
|
},
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
Ok(gst::FlowSuccess::Ok)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn write_buffer(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let started_state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
|
|
|
|
return Err(gst::FlowError::Error);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let map = buffer.map_readable().map_err(|_| {
|
|
|
|
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
|
|
|
|
gst::FlowError::Error
|
|
|
|
})?;
|
|
|
|
|
|
|
|
if self.to_write_next_file(started_state, buffer, map.size() as u64) {
|
|
|
|
self.write_put_object_request(started_state)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
self.accumulate_buffer(buffer, started_state)
|
|
|
|
}
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#[glib::object_subclass]
|
|
|
|
impl ObjectSubclass for S3PutObjectSink {
|
|
|
|
const NAME: &'static str = "GstAwsS3PutObjectSink";
|
|
|
|
type Type = super::S3PutObjectSink;
|
|
|
|
type ParentType = gst_base::BaseSink;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ObjectImpl for S3PutObjectSink {
|
|
|
|
fn properties() -> &'static [glib::ParamSpec] {
|
|
|
|
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
|
|
|
vec![
|
|
|
|
glib::ParamSpecString::builder("bucket")
|
|
|
|
.nick("S3 Bucket")
|
|
|
|
.blurb("The bucket of the file to write")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("key")
|
|
|
|
.nick("S3 Key")
|
|
|
|
.blurb("The key of the file to write")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("region")
|
|
|
|
.nick("AWS Region")
|
|
|
|
.blurb("An AWS region (e.g. eu-west-2).")
|
|
|
|
.default_value(Some("us-west-2"))
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("uri")
|
|
|
|
.nick("URI")
|
|
|
|
.blurb("The S3 object URI")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("access-key")
|
|
|
|
.nick("Access Key")
|
|
|
|
.blurb("AWS Access Key")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("secret-access-key")
|
|
|
|
.nick("Secret Access Key")
|
|
|
|
.blurb("AWS Secret Access Key")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("session-token")
|
|
|
|
.nick("Session Token")
|
|
|
|
.blurb("AWS temporary Session Token from STS")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecBoxed::builder::<gst::Structure>("metadata")
|
|
|
|
.nick("Metadata")
|
|
|
|
.blurb("A map of metadata to store with the object in S3; field values need to be convertible to strings.")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecUInt::builder("retry-attempts")
|
|
|
|
.nick("Retry attempts")
|
|
|
|
.blurb("Number of times AWS SDK attempts a request before abandoning the request")
|
|
|
|
.minimum(1)
|
|
|
|
.maximum(10)
|
|
|
|
.default_value(DEFAULT_RETRY_ATTEMPTS)
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecInt64::builder("request-timeout")
|
|
|
|
.nick("Request timeout")
|
|
|
|
.blurb("Timeout for general S3 requests (in ms, set to -1 for infinity)")
|
|
|
|
.minimum(-1)
|
|
|
|
.default_value(DEFAULT_REQUEST_TIMEOUT_MSEC as i64)
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("endpoint-uri")
|
|
|
|
.nick("S3 endpoint URI")
|
|
|
|
.blurb("The S3 endpoint URI to use")
|
|
|
|
.build(),
|
2024-05-24 10:11:19 +00:00
|
|
|
glib::ParamSpecString::builder("cache-control")
|
|
|
|
.nick("cache-control")
|
|
|
|
.blurb("Cache-Control header to set for uploaded object")
|
|
|
|
.build(),
|
2023-09-27 17:08:10 +00:00
|
|
|
glib::ParamSpecString::builder("content-type")
|
|
|
|
.nick("content-type")
|
|
|
|
.blurb("Content-Type header to set for uploaded object")
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("content-disposition")
|
|
|
|
.nick("content-disposition")
|
|
|
|
.blurb("Content-Disposition header to set for uploaded object")
|
|
|
|
.build(),
|
2024-05-24 10:11:19 +00:00
|
|
|
glib::ParamSpecString::builder("content-encoding")
|
|
|
|
.nick("content-encoding")
|
|
|
|
.blurb("Content-Encoding header to set for uploaded object")
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecString::builder("content-language")
|
|
|
|
.nick("content-language")
|
|
|
|
.blurb("Content-Language header to set for uploaded object")
|
|
|
|
.build(),
|
2023-09-28 15:36:04 +00:00
|
|
|
glib::ParamSpecUInt64::builder("flush-interval-buffers")
|
|
|
|
.nick("Flush interval in buffers")
|
|
|
|
.blurb("Number of buffers to accumulate before doing a write (0 => disable)")
|
|
|
|
.default_value(DEFAULT_FLUSH_INTERVAL_BUFFERS)
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecUInt64::builder("flush-interval-bytes")
|
|
|
|
.nick("Flush interval in bytes")
|
|
|
|
.blurb("Number of bytes to accumulate before doing a write (0 => disable)")
|
|
|
|
.default_value(DEFAULT_FLUSH_INTERVAL_BYTES)
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecUInt64::builder("flush-interval-time")
|
|
|
|
.nick("Flush interval in duration")
|
|
|
|
.blurb("Total duration of buffers to accumulate before doing a write (0 => disable)")
|
|
|
|
.default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds())
|
|
|
|
.build(),
|
2023-09-28 16:38:19 +00:00
|
|
|
glib::ParamSpecBoolean::builder("flush-on-error")
|
|
|
|
.nick("Flush on error")
|
|
|
|
.blurb("Whether to write out the data on error (like stopping without an EOS)")
|
|
|
|
.default_value(DEFAULT_FLUSH_ON_ERROR)
|
|
|
|
.build(),
|
2024-04-05 08:13:10 +00:00
|
|
|
glib::ParamSpecBoolean::builder("force-path-style")
|
|
|
|
.nick("Force path style")
|
|
|
|
.blurb("Force client to use path-style addressing for buckets")
|
|
|
|
.default_value(DEFAULT_FORCE_PATH_STYLE)
|
|
|
|
.build(),
|
2024-04-17 16:11:06 +00:00
|
|
|
glib::ParamSpecEnum::builder_with_default("next-file", DEFAULT_NEXT_FILE)
|
|
|
|
.nick("Next File")
|
|
|
|
.blurb("When to start new file")
|
|
|
|
.mutable_ready()
|
|
|
|
.build(),
|
|
|
|
glib::ParamSpecUInt64::builder("min-keyframe-distance")
|
|
|
|
.nick("Minimum keyframe distance")
|
|
|
|
.blurb("Minimum distance between keyframes to start a new file")
|
|
|
|
.default_value(DEFAULT_MIN_KEYFRAME_DISTANCE.into())
|
|
|
|
.build(),
|
2023-09-27 17:08:10 +00:00
|
|
|
]
|
|
|
|
});
|
|
|
|
|
|
|
|
PROPERTIES.as_ref()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
|
|
|
let mut settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
gst::debug!(
|
|
|
|
CAT,
|
2024-07-08 08:03:11 +00:00
|
|
|
imp = self,
|
2023-09-27 17:08:10 +00:00
|
|
|
"Setting property '{}' to '{:?}'",
|
|
|
|
pspec.name(),
|
|
|
|
value
|
|
|
|
);
|
|
|
|
|
|
|
|
match pspec.name() {
|
|
|
|
"bucket" => {
|
|
|
|
settings.bucket = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
if settings.key.is_some() {
|
|
|
|
let _ = self.set_uri(Some(&settings.to_uri()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"key" => {
|
|
|
|
settings.key = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
if settings.bucket.is_some() {
|
|
|
|
let _ = self.set_uri(Some(&settings.to_uri()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"region" => {
|
|
|
|
let region = value.get::<String>().expect("type checked upstream");
|
|
|
|
settings.region = Region::new(region);
|
|
|
|
if settings.key.is_some() && settings.bucket.is_some() {
|
|
|
|
let _ = self.set_uri(Some(&settings.to_uri()));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"uri" => {
|
|
|
|
let _ = self.set_uri(value.get().expect("type checked upstream"));
|
|
|
|
}
|
|
|
|
"access-key" => {
|
|
|
|
settings.access_key = value.get().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"secret-access-key" => {
|
|
|
|
settings.secret_access_key = value.get().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"session-token" => {
|
|
|
|
settings.session_token = value.get().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"metadata" => {
|
|
|
|
settings.metadata = value.get().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"retry-attempts" => {
|
|
|
|
settings.retry_attempts = value.get::<u32>().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"request-timeout" => {
|
|
|
|
settings.request_timeout =
|
|
|
|
duration_from_millis(value.get::<i64>().expect("type checked upstream"));
|
|
|
|
}
|
|
|
|
"endpoint-uri" => {
|
|
|
|
settings.endpoint_uri = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
if settings.key.is_some() && settings.bucket.is_some() {
|
|
|
|
let _ = self.set_uri(Some(&settings.to_uri()));
|
|
|
|
}
|
|
|
|
}
|
2024-05-24 10:11:19 +00:00
|
|
|
"cache-control" => {
|
|
|
|
settings.cache_control = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
}
|
2023-09-27 17:08:10 +00:00
|
|
|
"content-type" => {
|
|
|
|
settings.content_type = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"content-disposition" => {
|
|
|
|
settings.content_disposition = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
}
|
2024-05-24 10:11:19 +00:00
|
|
|
"content-encoding" => {
|
|
|
|
settings.content_encoding = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"content-language" => {
|
|
|
|
settings.content_language = value
|
|
|
|
.get::<Option<String>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
}
|
2023-09-28 15:36:04 +00:00
|
|
|
"flush-interval-buffers" => {
|
|
|
|
settings.flush_interval_buffers =
|
|
|
|
value.get::<u64>().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"flush-interval-bytes" => {
|
|
|
|
settings.flush_interval_bytes = value.get::<u64>().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"flush-interval-time" => {
|
|
|
|
settings.flush_interval_time = value
|
|
|
|
.get::<Option<gst::ClockTime>>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
}
|
2023-09-28 16:38:19 +00:00
|
|
|
"flush-on-error" => {
|
|
|
|
settings.flush_on_error = value.get::<bool>().expect("type checked upstream");
|
|
|
|
}
|
2024-04-05 08:13:10 +00:00
|
|
|
"force-path-style" => {
|
|
|
|
settings.force_path_style = value.get::<bool>().expect("type checked upstream");
|
|
|
|
}
|
2024-04-17 16:11:06 +00:00
|
|
|
"next-file" => {
|
|
|
|
settings.next_file = value.get::<NextFile>().expect("type checked upstream");
|
|
|
|
}
|
|
|
|
"min-keyframe-distance" => {
|
|
|
|
settings.min_keyframe_distance = value
|
|
|
|
.get::<gst::ClockTime>()
|
|
|
|
.expect("type checked upstream");
|
|
|
|
}
|
2023-09-27 17:08:10 +00:00
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
|
|
|
match pspec.name() {
|
|
|
|
"key" => settings.key.to_value(),
|
|
|
|
"bucket" => settings.bucket.to_value(),
|
|
|
|
"region" => settings.region.to_string().to_value(),
|
|
|
|
"uri" => {
|
|
|
|
let url = self.url.lock().unwrap();
|
|
|
|
let url = match *url {
|
|
|
|
Some(ref url) => url.to_string(),
|
|
|
|
None => "".to_string(),
|
|
|
|
};
|
|
|
|
|
|
|
|
url.to_value()
|
|
|
|
}
|
|
|
|
"access-key" => settings.access_key.to_value(),
|
|
|
|
"secret-access-key" => settings.secret_access_key.to_value(),
|
|
|
|
"session-token" => settings.session_token.to_value(),
|
|
|
|
"metadata" => settings.metadata.to_value(),
|
|
|
|
"retry-attempts" => settings.retry_attempts.to_value(),
|
|
|
|
"request-timeout" => duration_to_millis(Some(settings.request_timeout)).to_value(),
|
|
|
|
"endpoint-uri" => settings.endpoint_uri.to_value(),
|
2024-05-24 10:11:19 +00:00
|
|
|
"cache-control" => settings.cache_control.to_value(),
|
2023-09-27 17:08:10 +00:00
|
|
|
"content-type" => settings.content_type.to_value(),
|
|
|
|
"content-disposition" => settings.content_disposition.to_value(),
|
2024-05-24 10:11:19 +00:00
|
|
|
"content-encoding" => settings.content_encoding.to_value(),
|
|
|
|
"content-language" => settings.content_language.to_value(),
|
2023-09-28 15:36:04 +00:00
|
|
|
"flush-interval-buffers" => settings.flush_interval_buffers.to_value(),
|
|
|
|
"flush-interval-bytes" => settings.flush_interval_bytes.to_value(),
|
|
|
|
"flush-interval-time" => settings.flush_interval_time.to_value(),
|
2023-09-28 16:38:19 +00:00
|
|
|
"flush-on-error" => settings.flush_on_error.to_value(),
|
2024-04-05 08:13:10 +00:00
|
|
|
"force-path-style" => settings.force_path_style.to_value(),
|
2024-04-17 16:11:06 +00:00
|
|
|
"min-keyframe-distance" => settings.min_keyframe_distance.to_value(),
|
|
|
|
"next-file" => settings.next_file.to_value(),
|
2023-09-27 17:08:10 +00:00
|
|
|
_ => unimplemented!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl GstObjectImpl for S3PutObjectSink {}
|
|
|
|
|
|
|
|
impl ElementImpl for S3PutObjectSink {
|
|
|
|
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
|
|
|
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
2024-04-17 16:11:06 +00:00
|
|
|
#[cfg(feature = "doc")]
|
|
|
|
NextFile::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
|
2023-09-27 17:08:10 +00:00
|
|
|
gst::subclass::ElementMetadata::new(
|
|
|
|
"Amazon S3 PutObject sink",
|
|
|
|
"Source/Network",
|
|
|
|
"Writes an object to Amazon S3 using PutObject (mostly useful for small files)",
|
|
|
|
"Arun Raghavan <arun@asymptotic.io>",
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
Some(&*ELEMENT_METADATA)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn pad_templates() -> &'static [gst::PadTemplate] {
|
|
|
|
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
|
|
|
let caps = gst::Caps::new_any();
|
|
|
|
let sink_pad_template = gst::PadTemplate::new(
|
|
|
|
"sink",
|
|
|
|
gst::PadDirection::Sink,
|
|
|
|
gst::PadPresence::Always,
|
|
|
|
&caps,
|
|
|
|
)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
vec![sink_pad_template]
|
|
|
|
});
|
|
|
|
|
|
|
|
PAD_TEMPLATES.as_ref()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl BaseSinkImpl for S3PutObjectSink {
|
|
|
|
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
|
|
|
self.start()
|
|
|
|
}
|
|
|
|
|
|
|
|
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
2023-09-28 16:38:19 +00:00
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
if let State::Started(ref mut started_state) = *state {
|
2023-09-28 16:38:19 +00:00
|
|
|
if settings.flush_on_error && started_state.need_flush {
|
|
|
|
drop(settings);
|
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
if self.write_put_object_request(started_state).is_err() {
|
|
|
|
gst::error!(CAT, imp = self, "Failed to finalize the next-file upload",);
|
2023-09-28 16:38:19 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2023-09-27 17:08:10 +00:00
|
|
|
|
|
|
|
*state = State::Stopped;
|
2024-04-17 16:11:06 +00:00
|
|
|
|
2024-07-08 08:03:11 +00:00
|
|
|
gst::info!(CAT, imp = self, "Stopped");
|
2023-09-27 17:08:10 +00:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
|
2024-04-17 16:11:06 +00:00
|
|
|
self.write_buffer(buffer)
|
|
|
|
}
|
2023-09-28 15:36:04 +00:00
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
fn event(&self, event: gst::Event) -> bool {
|
|
|
|
use gst::EventView;
|
2023-09-28 15:36:04 +00:00
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
match event.view() {
|
|
|
|
EventView::CustomDownstream(ev) => {
|
|
|
|
let settings = self.settings.lock().unwrap();
|
|
|
|
let next_file = settings.next_file;
|
|
|
|
let is_next_key_unit_event = next_file == NextFile::KeyUnitEvent;
|
|
|
|
drop(settings);
|
2023-09-27 17:08:10 +00:00
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
if is_next_key_unit_event && gst_video::ForceKeyUnitEvent::is(ev) {
|
|
|
|
use gst_video::DownstreamForceKeyUnitEvent;
|
|
|
|
|
|
|
|
match DownstreamForceKeyUnitEvent::parse(ev) {
|
|
|
|
Ok(_key_unit_event) => {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
|
|
|
|
if let State::Started(ref mut started_state) = *state {
|
|
|
|
if let Err(e) = self.write_put_object_request(started_state) {
|
|
|
|
gst::element_imp_error!(
|
|
|
|
self,
|
|
|
|
gst::CoreError::Failed,
|
|
|
|
["Failed to write on KeyUnitEvent, {e}"]
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Err(e) => gst::error!(CAT, "Failed to parse key unit event: {e}"),
|
|
|
|
}
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
2024-04-17 16:11:06 +00:00
|
|
|
}
|
|
|
|
EventView::Eos(_) => {
|
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
|
|
|
|
if let State::Started(ref mut started_state) = *state {
|
|
|
|
started_state.need_flush = false;
|
|
|
|
|
|
|
|
if self.write_put_object_request(started_state).is_err() {
|
|
|
|
gst::element_imp_error!(
|
|
|
|
self,
|
|
|
|
gst::CoreError::Failed,
|
|
|
|
["Failed to finalize the upload"]
|
|
|
|
);
|
|
|
|
}
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
2024-04-17 16:11:06 +00:00
|
|
|
}
|
|
|
|
_ => (),
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
2024-04-17 16:11:06 +00:00
|
|
|
|
|
|
|
BaseSinkImplExt::parent_event(self, event)
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
|
2024-06-07 15:55:43 +00:00
|
|
|
let mut canceller = self.canceller.lock().unwrap();
|
|
|
|
canceller.abort();
|
|
|
|
Ok(())
|
|
|
|
}
|
2023-09-27 17:08:10 +00:00
|
|
|
|
2024-06-07 15:55:43 +00:00
|
|
|
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
|
|
|
|
let mut canceller = self.canceller.lock().unwrap();
|
|
|
|
*canceller = s3utils::Canceller::None;
|
2023-09-27 17:08:10 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> {
|
|
|
|
let s = caps
|
|
|
|
.structure(0)
|
|
|
|
.ok_or(gst::loggable_error!(CAT, "Missing caps in set_caps"))?;
|
2023-09-28 16:38:19 +00:00
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
if let Ok(Some(streamheaders)) = s.get_optional::<gst::ArrayRef>("streamheader") {
|
|
|
|
if streamheaders.is_empty() {
|
|
|
|
return Ok(());
|
2023-09-28 16:38:19 +00:00
|
|
|
}
|
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
let streamheaders = streamheaders.as_slice();
|
|
|
|
let mut headers: Vec<u8> = Vec::new();
|
2023-09-28 16:38:19 +00:00
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
let started_state = match *state {
|
|
|
|
State::Started(ref mut started_state) => started_state,
|
|
|
|
State::Stopped => {
|
|
|
|
return Err(gst::loggable_error!(CAT, "Element should be started"));
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
started_state.streamheaders_size = 0;
|
|
|
|
|
|
|
|
for header in streamheaders {
|
|
|
|
let buffer = header.get::<Option<gst::Buffer>>();
|
|
|
|
|
|
|
|
if let Ok(Some(buf)) = buffer {
|
|
|
|
let map = buf.map_readable().map_err(|_| {
|
|
|
|
gst::element_imp_error!(
|
|
|
|
self,
|
|
|
|
gst::CoreError::Failed,
|
|
|
|
["Failed to map streamheader buffer"]
|
|
|
|
);
|
|
|
|
gst::loggable_error!(CAT, "Failed to map streamheader buffer")
|
|
|
|
})?;
|
|
|
|
|
|
|
|
headers.extend_from_slice(map.as_slice());
|
|
|
|
|
|
|
|
started_state.streamheaders_size += map.size() as u64;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !headers.is_empty() {
|
|
|
|
let _ = started_state.streamheaders.take();
|
|
|
|
gst::info!(CAT, imp = self, "Got streamheaders");
|
|
|
|
started_state.streamheaders = Some(headers);
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-04-17 16:11:06 +00:00
|
|
|
self.parent_set_caps(caps)
|
2023-09-27 17:08:10 +00:00
|
|
|
}
|
|
|
|
}
|