aws: Add next-file support to putobjectsink

Add `next-file` support to `awss3putobjectsink` on similar lines to
the `next-file` support in `multifilesink`.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1550>
This commit is contained in:
Sanchayan Maity 2024-04-17 21:41:06 +05:30 committed by GStreamer Marge Bot
parent d274caeb35
commit f3206c2e1a
5 changed files with 446 additions and 129 deletions

13
Cargo.lock generated
View file

@ -2302,12 +2302,14 @@ dependencies = [
"gstreamer-audio",
"gstreamer-base",
"gstreamer-check",
"gstreamer-video",
"once_cell",
"percent-encoding",
"rand",
"serde",
"serde_derive",
"serde_json",
"sprintf 0.2.1",
"test-with",
"tokio",
"url",
@ -2542,7 +2544,7 @@ dependencies = [
"gstreamer-video",
"m3u8-rs",
"once_cell",
"sprintf",
"sprintf 0.1.4",
]
[[package]]
@ -6617,6 +6619,15 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c0cdea5a20a06e7c57f627094e7b1618e5665592cd88f2d45fa4014e348db58"
[[package]]
name = "sprintf"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2819cb5194dfe9e6d102f4519a9fb9dc7106d2879b71b4fd4d4677f1175bd39"
dependencies = [
"thiserror",
]
[[package]]
name = "static_assertions"
version = "1.1.0"

View file

@ -398,6 +398,32 @@
"type": "GstStructure",
"writable": true
},
"min-keyframe-distance": {
"blurb": "Minimum distance between keyframes to start a new file",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "10000000000",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"next-file": {
"blurb": "When to start new file",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "next-buffer (0)",
"mutable": "ready",
"readable": true,
"type": "GstS3PutObjectSinkNextFile",
"writable": true
},
"region": {
"blurb": "An AWS region (e.g. eu-west-2).",
"conditionally-available": false,
@ -1255,6 +1281,41 @@
}
]
},
"GstS3PutObjectSinkNextFile": {
"kind": "enum",
"values": [
{
"desc": "New file for each buffer",
"name": "next-buffer",
"value": "0"
},
{
"desc": "New file after each discontinuity",
"name": "next-discont",
"value": "1"
},
{
"desc": "New file at each key frame",
"name": "next-key-frame",
"value": "2"
},
{
"desc": "New file after a force key unit event",
"name": "next-key-unit-event",
"value": "3"
},
{
"desc": "New file when the configured maximum file size would be exceeded with the next buffer or buffer list",
"name": "next-max-size",
"value": "4"
},
{
"desc": "New file when the configured maximum duration would be exceeded with the next buffer or buffer list",
"name": "next-max-duration",
"value": "5"
}
]
},
"GstS3SinkOnError": {
"kind": "enum",
"values": [

View file

@ -32,6 +32,8 @@ serde_derive = "1"
serde_json = "1"
url = "2"
once_cell.workspace = true
gst-video = { workspace = true, features = ["v1_22"] }
sprintf = "0.2"
[dev-dependencies]
chrono = { version = "0.4", features = [ "alloc" ] }

View file

@ -14,6 +14,33 @@ use gst::prelude::*;
mod multipartsink;
mod putobjectsink;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstS3PutObjectSinkNextFile")]
pub(crate) enum NextFile {
#[enum_value(name = "New file for each buffer", nick = "next-buffer")]
Buffer,
#[enum_value(name = "New file after each discontinuity", nick = "next-discont")]
Discont,
#[enum_value(name = "New file at each key frame", nick = "next-key-frame")]
KeyFrame,
#[enum_value(
name = "New file after a force key unit event",
nick = "next-key-unit-event"
)]
KeyUnitEvent,
#[enum_value(
name = "New file when the configured maximum file size would be exceeded with the next buffer or buffer list",
nick = "next-max-size"
)]
MaxSize,
#[enum_value(
name = "New file when the configured maximum duration would be exceeded with the next buffer or buffer list",
nick = "next-max-duration"
)]
MaxDuration,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstS3SinkOnError")]

View file

@ -20,6 +20,7 @@ use aws_sdk_s3::{
Client,
};
use super::NextFile;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::convert::From;
@ -35,6 +36,8 @@ const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
const DEFAULT_FLUSH_ON_ERROR: bool = false;
const DEFAULT_FORCE_PATH_STYLE: bool = false;
const DEFAULT_NEXT_FILE: NextFile = NextFile::Buffer;
const DEFAULT_MIN_KEYFRAME_DISTANCE: gst::ClockTime = gst::ClockTime::from_seconds(10);
// General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
@ -45,6 +48,11 @@ struct Started {
start_pts: Option<gst::ClockTime>,
num_buffers: u64,
need_flush: bool,
index: u64,
next_segment: Option<gst::ClockTime>,
streamheaders: Option<Vec<u8>>,
streamheaders_size: u64,
file_start_pts: Option<gst::ClockTime>,
}
impl Started {
@ -55,6 +63,11 @@ impl Started {
start_pts: gst::ClockTime::NONE,
num_buffers: 0,
need_flush: false,
index: 0,
next_segment: gst::ClockTime::NONE,
streamheaders: None,
streamheaders_size: 0,
file_start_pts: gst::ClockTime::NONE,
}
}
}
@ -87,6 +100,8 @@ struct Settings {
flush_interval_bytes: u64,
flush_interval_time: Option<gst::ClockTime>,
flush_on_error: bool,
next_file: NextFile,
min_keyframe_distance: gst::ClockTime,
}
impl Settings {
@ -147,6 +162,8 @@ impl Default for Settings {
flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
flush_on_error: DEFAULT_FLUSH_ON_ERROR,
next_file: DEFAULT_NEXT_FILE,
min_keyframe_distance: DEFAULT_MIN_KEYFRAME_DISTANCE,
}
}
}
@ -168,13 +185,9 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl S3PutObjectSink {
fn check_thresholds(
&self,
state: &Started,
pts: Option<gst::ClockTime>,
duration: Option<gst::ClockTime>,
) -> bool {
let settings = self.settings.lock().unwrap();
fn check_thresholds(&self, settings: &Settings, state: &Started, buffer: &gst::Buffer) -> bool {
let pts = buffer.pts();
let duration = buffer.duration();
#[allow(clippy::if_same_then_else)]
#[allow(clippy::needless_bool)]
@ -202,61 +215,6 @@ impl S3PutObjectSink {
}
}
fn flush_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
let put_object_req = self.create_put_object_request();
let put_object_req_future = put_object_req.send();
let _output =
s3utils::wait(&self.canceller, put_object_req_future).map_err(|err| match &err {
WaitError::FutureError(_) => Some(gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload object: {err}"]
)),
WaitError::Cancelled => None,
})?;
gst::debug!(CAT, imp = self, "Upload complete");
Ok(())
}
fn create_put_object_request(&self) -> PutObjectFluentBuilder {
let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
let state = match *state {
State::Started(ref started_state) => started_state,
State::Stopped => {
unreachable!("Element should be started");
}
};
let body = Some(ByteStream::from(state.buffer.clone()));
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
let key = Some(url.as_ref().unwrap().object.to_owned());
let cache_control = settings.cache_control.clone();
let content_type = settings.content_type.clone();
let content_disposition = settings.content_disposition.clone();
let content_encoding = settings.content_encoding.clone();
let content_language = settings.content_language.clone();
let metadata = settings.to_metadata(self);
let client = &state.client;
client
.put_object()
.set_body(body)
.set_bucket(bucket)
.set_cache_control(cache_control)
.set_content_disposition(content_disposition)
.set_content_encoding(content_encoding)
.set_content_type(content_type)
.set_content_language(content_language)
.set_key(key)
.set_metadata(metadata)
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
@ -357,6 +315,214 @@ impl S3PutObjectSink {
)),
}
}
fn accumulate_buffer(
&self,
buffer: &gst::Buffer,
started_state: &mut Started,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if started_state.start_pts.is_none() {
started_state.start_pts = buffer.pts();
}
started_state.num_buffers += 1;
started_state.need_flush = true;
gst::trace!(CAT, imp = self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
started_state.buffer.extend_from_slice(map.as_slice());
Ok(gst::FlowSuccess::Ok)
}
fn create_body_with_streamheaders(
&self,
next_file: NextFile,
streamheaders: &Option<Vec<u8>>,
buffer: &[u8],
) -> ByteStream {
match next_file {
NextFile::KeyFrame | NextFile::MaxSize | NextFile::MaxDuration => {
if let Some(headers) = streamheaders {
let with_sh = [&headers[..], buffer].concat();
ByteStream::from(with_sh)
} else {
ByteStream::from(buffer.to_vec())
}
}
_ => ByteStream::from(buffer.to_vec()),
}
}
fn create_put_object_request(
&self,
started_state: &mut Started,
) -> Result<Option<PutObjectFluentBuilder>, gst::FlowError> {
let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
if started_state.buffer.is_empty() {
return Ok(None);
}
let body = Some(self.create_body_with_streamheaders(
settings.next_file,
&started_state.streamheaders,
&started_state.buffer,
));
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
let object = url.as_ref().unwrap().object.to_owned();
let key = if object.contains("%0") {
match sprintf::sprintf!(&object, started_state.index) {
Ok(k) => {
/* Equivalent to opening a new file */
started_state.index += 1;
started_state.buffer = Vec::new();
Some(k)
}
Err(e) => {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to format file name: {}", e]
);
return Err(gst::FlowError::Error);
}
}
} else {
Some(object)
};
let metadata = settings.to_metadata(self);
let client = &started_state.client;
Ok(Some(
client
.put_object()
.set_body(body)
.set_bucket(bucket)
.set_key(key)
.set_metadata(metadata),
))
}
fn to_write_next_file(
&self,
started_state: &mut Started,
buffer: &gst::Buffer,
buffer_size: u64,
) -> bool {
let settings = self.settings.lock().unwrap();
let next_file = settings.next_file;
let max_file_size = settings.flush_interval_bytes;
let max_file_duration = settings.flush_interval_time;
let min_keyframe_distance = settings.min_keyframe_distance;
match next_file {
NextFile::Buffer => self.check_thresholds(&settings, started_state, buffer),
NextFile::MaxSize => {
started_state.buffer.len() as u64 + started_state.streamheaders_size + buffer_size
> max_file_size
}
NextFile::MaxDuration => {
let mut new_duration = gst::ClockTime::ZERO;
if buffer.pts().is_some() && started_state.file_start_pts.is_some() {
new_duration = buffer.pts().unwrap() - started_state.file_start_pts.unwrap();
if buffer.duration().is_some() {
new_duration += buffer.duration().unwrap();
}
}
started_state.file_start_pts = match buffer.pts() {
Some(pts) => Some(pts),
None => started_state.file_start_pts,
};
new_duration > max_file_duration.unwrap()
}
NextFile::KeyFrame => {
if started_state.next_segment == gst::ClockTime::NONE && buffer.pts().is_some() {
started_state.next_segment =
Some(buffer.pts().unwrap() + min_keyframe_distance);
}
if buffer.pts().is_some() {
let buffer_ts = buffer.pts().unwrap();
let delta_unit = buffer.flags().contains(gst::BufferFlags::DELTA_UNIT);
let next_segment = started_state
.next_segment
.expect("Next segment must be valid here");
if buffer_ts >= next_segment && !delta_unit {
started_state.next_segment = Some(next_segment + min_keyframe_distance);
true
} else {
false
}
} else {
false
}
}
NextFile::Discont => buffer.flags().contains(gst::BufferFlags::DISCONT),
NextFile::KeyUnitEvent => false, // Next file will be opened on KeyUnitEvent
}
}
fn write_put_object_request(
&self,
started_state: &mut Started,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let req = self.create_put_object_request(started_state)?;
if let Some(put_object_req) = req {
let put_object_req_future = put_object_req.send();
match s3utils::wait(&self.canceller, put_object_req_future) {
Ok(_) => Ok(gst::FlowSuccess::Ok),
Err(err) => match err {
WaitError::Cancelled => Ok(gst::FlowSuccess::Ok),
WaitError::FutureError(e) => {
gst::element_imp_error!(self, gst::CoreError::Failed, ["{e}"]);
Err(gst::FlowError::Error)
}
},
}
} else {
Ok(gst::FlowSuccess::Ok)
}
}
fn write_buffer(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
return Err(gst::FlowError::Error);
}
};
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
if self.to_write_next_file(started_state, buffer, map.size() as u64) {
self.write_put_object_request(started_state)?;
}
self.accumulate_buffer(buffer, started_state)
}
}
#[glib::object_subclass]
@ -473,6 +639,16 @@ impl ObjectImpl for S3PutObjectSink {
.blurb("Force client to use path-style addressing for buckets")
.default_value(DEFAULT_FORCE_PATH_STYLE)
.build(),
glib::ParamSpecEnum::builder_with_default("next-file", DEFAULT_NEXT_FILE)
.nick("Next File")
.blurb("When to start new file")
.mutable_ready()
.build(),
glib::ParamSpecUInt64::builder("min-keyframe-distance")
.nick("Minimum keyframe distance")
.blurb("Minimum distance between keyframes to start a new file")
.default_value(DEFAULT_MIN_KEYFRAME_DISTANCE.into())
.build(),
]
});
@ -587,6 +763,14 @@ impl ObjectImpl for S3PutObjectSink {
"force-path-style" => {
settings.force_path_style = value.get::<bool>().expect("type checked upstream");
}
"next-file" => {
settings.next_file = value.get::<NextFile>().expect("type checked upstream");
}
"min-keyframe-distance" => {
settings.min_keyframe_distance = value
.get::<gst::ClockTime>()
.expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -624,6 +808,8 @@ impl ObjectImpl for S3PutObjectSink {
"flush-interval-time" => settings.flush_interval_time.to_value(),
"flush-on-error" => settings.flush_on_error.to_value(),
"force-path-style" => settings.force_path_style.to_value(),
"min-keyframe-distance" => settings.min_keyframe_distance.to_value(),
"next-file" => settings.next_file.to_value(),
_ => unimplemented!(),
}
}
@ -634,6 +820,8 @@ impl GstObjectImpl for S3PutObjectSink {}
impl ElementImpl for S3PutObjectSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
#[cfg(feature = "doc")]
NextFile::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
gst::subclass::ElementMetadata::new(
"Amazon S3 PutObject sink",
"Source/Network",
@ -672,77 +860,77 @@ impl BaseSinkImpl for S3PutObjectSink {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if let State::Started(ref started_state) = *state {
if let State::Started(ref mut started_state) = *state {
if settings.flush_on_error && started_state.need_flush {
drop(settings);
drop(state);
gst::warning!(CAT, imp = self, "Stopped without EOS, but flushing");
if let Err(error_message) = self.flush_buffer() {
gst::error!(
CAT,
imp = self,
"Failed to finalize the upload: {:?}",
error_message
);
if self.write_put_object_request(started_state).is_err() {
gst::error!(CAT, imp = self, "Failed to finalize the next-file upload",);
}
state = self.state.lock().unwrap();
}
}
*state = State::Stopped;
gst::info!(CAT, imp = self, "Stopped");
Ok(())
}
fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
self.write_buffer(buffer)
}
let started_state = match *state {
State::Started(ref mut s) => s,
State::Stopped => {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
return Err(gst::FlowError::Error);
fn event(&self, event: gst::Event) -> bool {
use gst::EventView;
match event.view() {
EventView::CustomDownstream(ev) => {
let settings = self.settings.lock().unwrap();
let next_file = settings.next_file;
let is_next_key_unit_event = next_file == NextFile::KeyUnitEvent;
drop(settings);
if is_next_key_unit_event && gst_video::ForceKeyUnitEvent::is(ev) {
use gst_video::DownstreamForceKeyUnitEvent;
match DownstreamForceKeyUnitEvent::parse(ev) {
Ok(_key_unit_event) => {
let mut state = self.state.lock().unwrap();
if let State::Started(ref mut started_state) = *state {
if let Err(e) = self.write_put_object_request(started_state) {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to write on KeyUnitEvent, {e}"]
);
}
}
}
Err(e) => gst::error!(CAT, "Failed to parse key unit event: {e}"),
}
}
}
};
EventView::Eos(_) => {
let mut state = self.state.lock().unwrap();
if started_state.start_pts.is_none() {
started_state.start_pts = buffer.pts();
}
if let State::Started(ref mut started_state) = *state {
started_state.need_flush = false;
started_state.num_buffers += 1;
started_state.need_flush = true;
gst::trace!(CAT, imp = self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
started_state.buffer.extend_from_slice(map.as_slice());
if !self.check_thresholds(started_state, buffer.pts(), buffer.duration()) {
return Ok(gst::FlowSuccess::Ok);
}
drop(state);
match self.flush_buffer() {
Ok(_) => Ok(gst::FlowSuccess::Ok),
Err(err) => match err {
Some(error_message) => {
gst::error!(CAT, imp = self, "Upload failed: {}", error_message);
self.post_error_message(error_message);
Err(gst::FlowError::Error)
if self.write_put_object_request(started_state).is_err() {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to finalize the upload"]
);
}
}
_ => {
gst::info!(CAT, imp = self, "Upload interrupted. Flushing...");
Err(gst::FlowError::Flushing)
}
},
}
_ => (),
}
BaseSinkImplExt::parent_event(self, event)
}
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
@ -757,27 +945,55 @@ impl BaseSinkImpl for S3PutObjectSink {
Ok(())
}
fn event(&self, event: gst::Event) -> bool {
if let gst::EventView::Eos(_) = event.view() {
let mut state = self.state.lock().unwrap();
fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> {
let s = caps
.structure(0)
.ok_or(gst::loggable_error!(CAT, "Missing caps in set_caps"))?;
if let State::Started(ref mut started_state) = *state {
started_state.need_flush = false;
if let Ok(Some(streamheaders)) = s.get_optional::<gst::ArrayRef>("streamheader") {
if streamheaders.is_empty() {
return Ok(());
}
drop(state);
let streamheaders = streamheaders.as_slice();
let mut headers: Vec<u8> = Vec::new();
if let Err(error_message) = self.flush_buffer() {
gst::error!(
CAT,
imp = self,
"Failed to finalize the upload: {:?}",
error_message
);
return false;
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
return Err(gst::loggable_error!(CAT, "Element should be started"));
}
};
started_state.streamheaders_size = 0;
for header in streamheaders {
let buffer = header.get::<Option<gst::Buffer>>();
if let Ok(Some(buf)) = buffer {
let map = buf.map_readable().map_err(|_| {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to map streamheader buffer"]
);
gst::loggable_error!(CAT, "Failed to map streamheader buffer")
})?;
headers.extend_from_slice(map.as_slice());
started_state.streamheaders_size += map.size() as u64;
}
}
if !headers.is_empty() {
let _ = started_state.streamheaders.take();
gst::info!(CAT, imp = self, "Got streamheaders");
started_state.streamheaders = Some(headers);
}
}
BaseSinkImplExt::parent_event(self, event)
self.parent_set_caps(caps)
}
}