Merge branch 'putobjectsink-multi-file' into 'main'

aws: Add `next-file` support to putobjectsink

Closes #481

See merge request gstreamer/gst-plugins-rs!1550
This commit is contained in:
Sanchayan Maity 2024-05-03 15:27:08 +00:00
commit 00884ee292
6 changed files with 475 additions and 121 deletions

13
Cargo.lock generated
View file

@ -2243,12 +2243,14 @@ dependencies = [
"gstreamer-audio",
"gstreamer-base",
"gstreamer-check",
"gstreamer-video",
"once_cell",
"percent-encoding",
"rand",
"serde",
"serde_derive",
"serde_json",
"sprintf 0.2.1",
"test-with",
"tokio",
"url",
@ -2482,7 +2484,7 @@ dependencies = [
"gstreamer-video",
"m3u8-rs",
"once_cell",
"sprintf",
"sprintf 0.1.4",
]
[[package]]
@ -6354,6 +6356,15 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c0cdea5a20a06e7c57f627094e7b1618e5665592cd88f2d45fa4014e348db58"
[[package]]
name = "sprintf"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2819cb5194dfe9e6d102f4519a9fb9dc7106d2879b71b4fd4d4677f1175bd39"
dependencies = [
"thiserror",
]
[[package]]
name = "static_assertions"
version = "1.1.0"

View file

@ -23,6 +23,7 @@ You will find the following plugins in this repository:
- `aws`: Various elements for Amazon AWS services using the [AWS SDK](https://awslabs.github.io/aws-sdk-rust/) library
- `s3src`/`s3sink`: A source and sink element to talk to the Amazon S3 object storage system.
- `s3putobjectsink`: A sink element to talk to Amazon S3. Uses `PutObject` instead of multi-part upload like `s3sink`.
- `s3hlssink`: A sink element to store HLS streams on Amazon S3.
- `awstranscriber`: an element wrapping the AWS Transcriber service.
- `awstranscribeparse`: an element parsing the packets of the AWS Transcriber service.

View file

@ -362,6 +362,32 @@
"type": "GstStructure",
"writable": true
},
"min-keyframe-distance": {
"blurb": "Minimum distance between keyframes to start a new file",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "10000000000",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"next-file": {
"blurb": "When to start new file",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "next-buffer (0)",
"mutable": "ready",
"readable": false,
"type": "GstS3PutObjectSinkNextFile",
"writable": true
},
"region": {
"blurb": "An AWS region (e.g. eu-west-2).",
"conditionally-available": false,
@ -1183,6 +1209,41 @@
}
]
},
"GstS3PutObjectSinkNextFile": {
"kind": "enum",
"values": [
{
"desc": "NextBuffer: New file for each buffer.",
"name": "next-buffer",
"value": "0"
},
{
"desc": "NextDiscont: New file after each discontinuity.",
"name": "next-discont",
"value": "1"
},
{
"desc": "NextKeyFrame: New file at each key frame.",
"name": "next-key-frame",
"value": "2"
},
{
"desc": "NextKeyUnitEvent: New file after a force key unit event.",
"name": "next-key-unit-event",
"value": "3"
},
{
"desc": "NextMaxSize: New file when the configured maximum file size would be exceeded with the next buffer or buffer list.",
"name": "next-max-size",
"value": "4"
},
{
"desc": "NextMaxDuration: New file when the configured maximum duration would be exceeded with the next buffer or buffer list.",
"name": "next-max-duration",
"value": "5"
}
]
},
"GstS3SinkOnError": {
"kind": "enum",
"values": [
@ -11781,4 +11842,4 @@
"tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
}
}
}

View file

@ -32,6 +32,8 @@ serde_derive = "1"
serde_json = "1"
url = "2"
once_cell.workspace = true
gst-video = { workspace = true, features = ["v1_22"] }
sprintf = "0.2"
[dev-dependencies]
chrono = { version = "0.4", features = [ "alloc" ] }

View file

@ -14,6 +14,39 @@ use gst::prelude::*;
mod multipartsink;
mod putobjectsink;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstS3PutObjectSinkNextFile")]
pub(crate) enum NextFile {
#[enum_value(name = "NextBuffer: New file for each buffer.", nick = "next-buffer")]
Buffer,
#[enum_value(
name = "NextDiscont: New file after each discontinuity.",
nick = "next-discont"
)]
Discont,
#[enum_value(
name = "NextKeyFrame: New file at each key frame.",
nick = "next-key-frame"
)]
KeyFrame,
#[enum_value(
name = "NextKeyUnitEvent: New file after a force key unit event.",
nick = "next-key-unit-event"
)]
KeyUnitEvent,
#[enum_value(
name = "NextMaxSize: New file when the configured maximum file size would be exceeded with the next buffer or buffer list.",
nick = "next-max-size"
)]
MaxSize,
#[enum_value(
name = "NextMaxDuration: New file when the configured maximum duration would be exceeded with the next buffer or buffer list.",
nick = "next-max-duration"
)]
MaxDuration,
}
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstS3SinkOnError")]

View file

@ -15,12 +15,12 @@ use gst_base::subclass::prelude::*;
use aws_sdk_s3::{
config::{self, retry::RetryConfig, Credentials, Region},
error::ProvideErrorMetadata,
operation::put_object::builders::PutObjectFluentBuilder,
primitives::ByteStream,
Client,
};
use super::NextFile;
use futures::future;
use once_cell::sync::Lazy;
use std::collections::HashMap;
@ -37,6 +37,8 @@ const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
const DEFAULT_FLUSH_ON_ERROR: bool = false;
const DEFAULT_FORCE_PATH_STYLE: bool = false;
const DEFAULT_NEXT_FILE: NextFile = NextFile::Buffer;
const DEFAULT_MIN_KEYFRAME_DISTANCE: gst::ClockTime = gst::ClockTime::from_seconds(10);
// General setting for create / abort requests
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
@ -47,6 +49,12 @@ struct Started {
start_pts: Option<gst::ClockTime>,
num_buffers: u64,
need_flush: bool,
index: u64,
next_segment: Option<gst::ClockTime>,
streamheaders: Option<Vec<u8>>,
streamheaders_size: u64,
force_key_unit_count: i32,
file_pts: Option<gst::ClockTime>,
}
impl Started {
@ -57,6 +65,12 @@ impl Started {
start_pts: gst::ClockTime::NONE,
num_buffers: 0,
need_flush: false,
index: 0,
next_segment: gst::ClockTime::NONE,
streamheaders: None,
streamheaders_size: 0,
force_key_unit_count: -1,
file_pts: gst::ClockTime::NONE,
}
}
}
@ -86,6 +100,8 @@ struct Settings {
flush_interval_bytes: u64,
flush_interval_time: Option<gst::ClockTime>,
flush_on_error: bool,
next_file: NextFile,
min_keyframe_distance: gst::ClockTime,
}
impl Settings {
@ -143,6 +159,8 @@ impl Default for Settings {
flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
flush_on_error: DEFAULT_FLUSH_ON_ERROR,
next_file: DEFAULT_NEXT_FILE,
min_keyframe_distance: DEFAULT_MIN_KEYFRAME_DISTANCE,
}
}
}
@ -164,13 +182,9 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
});
impl S3PutObjectSink {
fn check_thresholds(
&self,
state: &Started,
pts: Option<gst::ClockTime>,
duration: Option<gst::ClockTime>,
) -> bool {
let settings = self.settings.lock().unwrap();
fn check_thresholds(&self, settings: &Settings, state: &Started, buffer: &gst::Buffer) -> bool {
let pts = buffer.pts();
let duration = buffer.duration();
#[allow(clippy::if_same_then_else)]
#[allow(clippy::needless_bool)]
@ -198,51 +212,6 @@ impl S3PutObjectSink {
}
}
fn flush_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
let put_object_req = self.create_put_object_request();
let put_object_req_future = put_object_req.send();
let _output =
s3utils::wait(&self.canceller, put_object_req_future).map_err(|err| match err {
WaitError::FutureError(err) => Some(gst::error_msg!(
gst::ResourceError::OpenWrite,
["Failed to upload object: {err}: {}", err.meta()]
)),
WaitError::Cancelled => None,
})?;
gst::debug!(CAT, imp: self, "Upload complete");
Ok(())
}
fn create_put_object_request(&self) -> PutObjectFluentBuilder {
let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
let state = self.state.lock().unwrap();
let state = match *state {
State::Started(ref started_state) => started_state,
State::Stopped => {
unreachable!("Element should be started");
}
};
let body = Some(ByteStream::from(state.buffer.clone()));
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
let key = Some(url.as_ref().unwrap().object.to_owned());
let metadata = settings.to_metadata(self);
let client = &state.client;
client
.put_object()
.set_body(body)
.set_bucket(bucket)
.set_key(key)
.set_metadata(metadata)
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
@ -351,6 +320,217 @@ impl S3PutObjectSink {
)),
}
}
fn accumulate_buffer(
&self,
buffer: &gst::Buffer,
started_state: &mut Started,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if started_state.start_pts.is_none() {
started_state.start_pts = buffer.pts();
}
started_state.num_buffers += 1;
started_state.need_flush = true;
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
started_state.buffer.extend_from_slice(map.as_slice());
Ok(gst::FlowSuccess::Ok)
}
fn create_body_with_streamheaders(
&self,
next_file: NextFile,
streamheaders: &Option<Vec<u8>>,
buffer: &[u8],
) -> ByteStream {
match next_file {
NextFile::KeyFrame | NextFile::MaxSize | NextFile::MaxDuration => {
if let Some(headers) = streamheaders {
let with_sh = [&headers[..], buffer].concat();
ByteStream::from(with_sh)
} else {
ByteStream::from(buffer.to_vec())
}
}
_ => ByteStream::from(buffer.to_vec()),
}
}
fn create_put_object_request(
&self,
started_state: &mut Started,
) -> Result<Option<PutObjectFluentBuilder>, gst::FlowError> {
let url = self.url.lock().unwrap();
let settings = self.settings.lock().unwrap();
if started_state.buffer.is_empty() {
return Ok(None);
}
let body = Some(self.create_body_with_streamheaders(
settings.next_file,
&started_state.streamheaders,
&started_state.buffer,
));
let bucket = Some(url.as_ref().unwrap().bucket.to_owned());
let object = url.as_ref().unwrap().object.to_owned();
let key = if object.contains("%0") {
match sprintf::sprintf!(&object, started_state.index) {
Ok(k) => {
/* Equivalent to opening a new file */
started_state.index += 1;
started_state.buffer = Vec::new();
Some(k)
}
Err(e) => {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to format file name: {}", e]
);
return Err(gst::FlowError::Error);
}
}
} else {
Some(object)
};
let metadata = settings.to_metadata(self);
let client = &started_state.client;
Ok(Some(
client
.put_object()
.set_body(body)
.set_bucket(bucket)
.set_key(key)
.set_metadata(metadata),
))
}
fn to_write_next_file(
&self,
started_state: &mut Started,
buffer: &gst::Buffer,
buffer_size: u64,
) -> bool {
let settings = self.settings.lock().unwrap();
let next_file = settings.next_file;
let max_file_size = settings.flush_interval_bytes;
let max_file_duration = settings.flush_interval_time;
let min_keyframe_distance = settings.min_keyframe_distance;
match next_file {
NextFile::Buffer => self.check_thresholds(&settings, started_state, buffer),
NextFile::MaxSize => {
started_state.buffer.len() as u64 + started_state.streamheaders_size + buffer_size
> max_file_size
}
NextFile::MaxDuration => {
let mut new_duration = gst::ClockTime::ZERO;
if buffer.pts().is_some() && started_state.file_pts.is_some() {
new_duration = buffer.pts().unwrap() - started_state.file_pts.unwrap();
if buffer.duration().is_some() {
new_duration += buffer.duration().unwrap();
}
}
started_state.file_pts = match buffer.pts() {
Some(pts) => Some(pts),
None => started_state.file_pts,
};
new_duration > max_file_duration.unwrap()
}
NextFile::KeyFrame => {
if started_state.next_segment == gst::ClockTime::NONE && buffer.pts().is_some() {
started_state.next_segment =
Some(buffer.pts().unwrap() + min_keyframe_distance);
}
if buffer.pts().is_some() {
let buffer_ts = buffer.pts().unwrap();
let delta_unit = buffer.flags().contains(gst::BufferFlags::DELTA_UNIT);
let next_segment = started_state
.next_segment
.expect("Next segment must be valid here");
if buffer_ts >= next_segment && !delta_unit {
started_state.next_segment = Some(next_segment + min_keyframe_distance);
true
} else {
false
}
} else {
false
}
}
NextFile::Discont => buffer.flags().contains(gst::BufferFlags::DISCONT),
NextFile::KeyUnitEvent => true,
}
}
fn write_put_object_request_with_next_file(
&self,
started_state: &mut Started,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let req = self.create_put_object_request(started_state)?;
if let Some(put_object_req) = req {
let put_object_req_future = put_object_req.send();
match s3utils::wait(&self.canceller, put_object_req_future) {
Ok(_) => Ok(gst::FlowSuccess::Ok),
Err(err) => match err {
WaitError::Cancelled => Ok(gst::FlowSuccess::Ok),
WaitError::FutureError(e) => {
gst::element_imp_error!(self, gst::CoreError::Failed, ["{}", e]);
Err(gst::FlowError::Error)
}
},
}
} else {
Ok(gst::FlowSuccess::Ok)
}
}
fn write_buffer_with_next_file(
&self,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
return Err(gst::FlowError::Error);
}
};
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
if self.to_write_next_file(started_state, buffer, map.size() as u64) {
self.write_put_object_request_with_next_file(started_state)?;
}
self.accumulate_buffer(buffer, started_state)
}
}
#[glib::object_subclass]
@ -455,6 +635,17 @@ impl ObjectImpl for S3PutObjectSink {
.blurb("Force client to use path-style addressing for buckets")
.default_value(DEFAULT_FORCE_PATH_STYLE)
.build(),
glib::ParamSpecEnum::builder_with_default("next-file", DEFAULT_NEXT_FILE)
.nick("Next File")
.blurb("When to start new file")
.write_only()
.mutable_ready()
.build(),
glib::ParamSpecUInt64::builder("min-keyframe-distance")
.nick("Minimum keyframe distance")
.blurb("Minimum distance between keyframes to start a new file")
.default_value(DEFAULT_MIN_KEYFRAME_DISTANCE.into())
.build(),
]
});
@ -554,6 +745,14 @@ impl ObjectImpl for S3PutObjectSink {
"force-path-style" => {
settings.force_path_style = value.get::<bool>().expect("type checked upstream");
}
"next-file" => {
settings.next_file = value.get::<NextFile>().expect("type checked upstream");
}
"min-keyframe-distance" => {
settings.min_keyframe_distance = value
.get::<gst::ClockTime>()
.expect("type checked upstream");
}
_ => unimplemented!(),
}
}
@ -588,6 +787,7 @@ impl ObjectImpl for S3PutObjectSink {
"flush-interval-time" => settings.flush_interval_time.to_value(),
"flush-on-error" => settings.flush_on_error.to_value(),
"force-path-style" => settings.force_path_style.to_value(),
"min-keyframe-distance" => settings.min_keyframe_distance.to_value(),
_ => unimplemented!(),
}
}
@ -598,6 +798,8 @@ impl GstObjectImpl for S3PutObjectSink {}
impl ElementImpl for S3PutObjectSink {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
#[cfg(feature = "doc")]
NextFile::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
gst::subclass::ElementMetadata::new(
"Amazon S3 PutObject sink",
"Source/Network",
@ -636,77 +838,32 @@ impl BaseSinkImpl for S3PutObjectSink {
let mut state = self.state.lock().unwrap();
let settings = self.settings.lock().unwrap();
if let State::Started(ref started_state) = *state {
if let State::Started(ref mut started_state) = *state {
if settings.flush_on_error && started_state.need_flush {
drop(settings);
drop(state);
gst::warning!(CAT, imp: self, "Stopped without EOS, but flushing");
if let Err(error_message) = self.flush_buffer() {
if self
.write_put_object_request_with_next_file(started_state)
.is_err()
{
gst::error!(
CAT,
imp: self,
"Failed to finalize the upload: {:?}",
error_message
"Failed to finalize the next-file upload",
);
}
state = self.state.lock().unwrap();
}
}
*state = State::Stopped;
gst::info!(CAT, imp: self, "Stopped");
Ok(())
}
fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut s) => s,
State::Stopped => {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
return Err(gst::FlowError::Error);
}
};
if started_state.start_pts.is_none() {
started_state.start_pts = buffer.pts();
}
started_state.num_buffers += 1;
started_state.need_flush = true;
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
let map = buffer.map_readable().map_err(|_| {
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
gst::FlowError::Error
})?;
started_state.buffer.extend_from_slice(map.as_slice());
if !self.check_thresholds(started_state, buffer.pts(), buffer.duration()) {
return Ok(gst::FlowSuccess::Ok);
}
drop(state);
match self.flush_buffer() {
Ok(_) => Ok(gst::FlowSuccess::Ok),
Err(err) => match err {
Some(error_message) => {
gst::error!(CAT, imp: self, "Upload failed: {}", error_message);
self.post_error_message(error_message);
Err(gst::FlowError::Error)
}
_ => {
gst::info!(CAT, imp: self, "Upload interrupted. Flushing...");
Err(gst::FlowError::Flushing)
}
},
}
self.write_buffer_with_next_file(buffer)
}
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
@ -716,26 +873,115 @@ impl BaseSinkImpl for S3PutObjectSink {
}
fn event(&self, event: gst::Event) -> bool {
if let gst::EventView::Eos(_) = event.view() {
let mut state = self.state.lock().unwrap();
use gst::EventView;
if let State::Started(ref mut started_state) = *state {
started_state.need_flush = false;
match event.view() {
EventView::CustomDownstream(ev) => {
let settings = self.settings.lock().unwrap();
let next_file = settings.next_file;
let is_next_key_unit_event = next_file == NextFile::KeyUnitEvent;
drop(settings);
/* Taken from gst_multi_file_sink_event */
if is_next_key_unit_event && gst_video::ForceKeyUnitEvent::is(ev) {
use gst_video::DownstreamForceKeyUnitEvent;
match DownstreamForceKeyUnitEvent::parse(ev) {
Ok(key_unit_event) => {
let mut state = self.state.lock().unwrap();
if let State::Started(ref mut started_state) = *state {
if started_state.force_key_unit_count != -1
&& started_state.force_key_unit_count as u32
== key_unit_event.count
{
return BaseSinkImplExt::parent_event(self, event);
}
started_state.force_key_unit_count = key_unit_event.count as i32;
let _ = self.write_put_object_request_with_next_file(started_state);
}
}
Err(e) => gst::error!(CAT, "Failed to parse key unit event: {}", e),
}
}
}
EventView::Eos(_) => {
let mut state = self.state.lock().unwrap();
drop(state);
if let State::Started(ref mut started_state) = *state {
started_state.need_flush = false;
if let Err(error_message) = self.flush_buffer() {
gst::error!(
CAT,
imp: self,
"Failed to finalize the upload: {:?}",
error_message
);
return false;
if self
.write_put_object_request_with_next_file(started_state)
.is_err()
{
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to finalize the upload"]
);
}
}
}
_ => (),
}
BaseSinkImplExt::parent_event(self, event)
}
fn set_caps(&self, caps: &gst::Caps) -> Result<(), gst::LoggableError> {
let s = caps
.structure(0)
.ok_or(gst::loggable_error!(CAT, "Missing caps in set_caps"))?;
if let Ok(Some(streamheaders)) = s.get_optional::<gst::ArrayRef>("streamheader") {
if streamheaders.is_empty() {
return Ok(());
}
let streamheaders = streamheaders.as_slice();
let mut headers: Vec<u8> = Vec::new();
let mut state = self.state.lock().unwrap();
let started_state = match *state {
State::Started(ref mut started_state) => started_state,
State::Stopped => {
return Err(gst::loggable_error!(CAT, "Element should be started"));
}
};
started_state.streamheaders_size = 0;
for header in streamheaders {
let buffer = header.get::<Option<gst::Buffer>>();
if let Ok(Some(buf)) = buffer {
let map = buf.map_readable().map_err(|_| {
gst::element_imp_error!(
self,
gst::CoreError::Failed,
["Failed to map streamheader buffer"]
);
gst::loggable_error!(CAT, "Failed to map streamheader buffer")
})?;
headers.extend_from_slice(map.as_slice());
started_state.streamheaders_size += map.size() as u64;
}
}
if !headers.is_empty() {
let _ = started_state.streamheaders.take();
gst::info!(CAT, imp: self, "Got streamheaders");
started_state.streamheaders = Some(headers);
}
drop(state);
}
self.parent_set_caps(caps)
}
}