mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-10 19:25:26 +00:00
aws: s3putobjectsink: Add some thresholds for flushing
Lets us connect when we perform a flush Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1337>
This commit is contained in:
parent
a54b2dd39e
commit
12dbf50ddc
2 changed files with 160 additions and 16 deletions
|
@ -31,6 +31,9 @@ use crate::s3url::*;
|
||||||
use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
|
use crate::s3utils::{self, duration_from_millis, duration_to_millis, WaitError};
|
||||||
|
|
||||||
const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
|
const DEFAULT_RETRY_ATTEMPTS: u32 = 5;
|
||||||
|
const DEFAULT_FLUSH_INTERVAL_BUFFERS: u64 = 1;
|
||||||
|
const DEFAULT_FLUSH_INTERVAL_BYTES: u64 = 0;
|
||||||
|
const DEFAULT_FLUSH_INTERVAL_TIME: gst::ClockTime = gst::ClockTime::from_nseconds(0);
|
||||||
|
|
||||||
// General setting for create / abort requests
|
// General setting for create / abort requests
|
||||||
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
|
const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
|
||||||
|
@ -38,11 +41,18 @@ const DEFAULT_REQUEST_TIMEOUT_MSEC: u64 = 15_000;
|
||||||
struct Started {
|
struct Started {
|
||||||
client: Client,
|
client: Client,
|
||||||
buffer: Vec<u8>,
|
buffer: Vec<u8>,
|
||||||
|
start_pts: Option<gst::ClockTime>,
|
||||||
|
num_buffers: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Started {
|
impl Started {
|
||||||
pub fn new(client: Client, buffer: Vec<u8>) -> Started {
|
pub fn new(client: Client, buffer: Vec<u8>) -> Started {
|
||||||
Started { client, buffer }
|
Started {
|
||||||
|
client,
|
||||||
|
buffer,
|
||||||
|
start_pts: gst::ClockTime::NONE,
|
||||||
|
num_buffers: 0,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,6 +76,9 @@ struct Settings {
|
||||||
retry_attempts: u32,
|
retry_attempts: u32,
|
||||||
request_timeout: Duration,
|
request_timeout: Duration,
|
||||||
endpoint_uri: Option<String>,
|
endpoint_uri: Option<String>,
|
||||||
|
flush_interval_buffers: u64,
|
||||||
|
flush_interval_bytes: u64,
|
||||||
|
flush_interval_time: Option<gst::ClockTime>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Settings {
|
impl Settings {
|
||||||
|
@ -118,6 +131,9 @@ impl Default for Settings {
|
||||||
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
|
retry_attempts: DEFAULT_RETRY_ATTEMPTS,
|
||||||
request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
|
request_timeout: Duration::from_millis(DEFAULT_REQUEST_TIMEOUT_MSEC),
|
||||||
endpoint_uri: None,
|
endpoint_uri: None,
|
||||||
|
flush_interval_buffers: DEFAULT_FLUSH_INTERVAL_BUFFERS,
|
||||||
|
flush_interval_bytes: DEFAULT_FLUSH_INTERVAL_BYTES,
|
||||||
|
flush_interval_time: Some(DEFAULT_FLUSH_INTERVAL_TIME),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,6 +155,40 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
});
|
});
|
||||||
|
|
||||||
impl S3PutObjectSink {
|
impl S3PutObjectSink {
|
||||||
|
fn check_thresholds(
|
||||||
|
&self,
|
||||||
|
state: &Started,
|
||||||
|
pts: Option<gst::ClockTime>,
|
||||||
|
duration: Option<gst::ClockTime>,
|
||||||
|
) -> bool {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
#[allow(clippy::if_same_then_else)]
|
||||||
|
#[allow(clippy::needless_bool)]
|
||||||
|
// Verbose if/else form for readability
|
||||||
|
if settings.flush_interval_buffers > 0
|
||||||
|
&& (state.num_buffers % settings.flush_interval_buffers) == 0
|
||||||
|
{
|
||||||
|
true
|
||||||
|
} else if settings.flush_interval_bytes > 0
|
||||||
|
&& (state.buffer.len() as u64 % settings.flush_interval_bytes) == 0
|
||||||
|
{
|
||||||
|
true
|
||||||
|
} else if settings.flush_interval_time.is_some()
|
||||||
|
&& settings.flush_interval_time.unwrap() != DEFAULT_FLUSH_INTERVAL_TIME
|
||||||
|
&& state.start_pts.is_some()
|
||||||
|
&& pts.is_some()
|
||||||
|
&& duration.is_some()
|
||||||
|
&& (pts.unwrap() - state.start_pts.unwrap() + duration.unwrap())
|
||||||
|
% settings.flush_interval_time.unwrap()
|
||||||
|
== gst::ClockTime::from_nseconds(0)
|
||||||
|
{
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn flush_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
|
fn flush_buffer(&self) -> Result<(), Option<gst::ErrorMessage>> {
|
||||||
let put_object_req = self.create_put_object_request();
|
let put_object_req = self.create_put_object_request();
|
||||||
|
|
||||||
|
@ -370,6 +420,21 @@ impl ObjectImpl for S3PutObjectSink {
|
||||||
.nick("content-disposition")
|
.nick("content-disposition")
|
||||||
.blurb("Content-Disposition header to set for uploaded object")
|
.blurb("Content-Disposition header to set for uploaded object")
|
||||||
.build(),
|
.build(),
|
||||||
|
glib::ParamSpecUInt64::builder("flush-interval-buffers")
|
||||||
|
.nick("Flush interval in buffers")
|
||||||
|
.blurb("Number of buffers to accumulate before doing a write (0 => disable)")
|
||||||
|
.default_value(DEFAULT_FLUSH_INTERVAL_BUFFERS)
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt64::builder("flush-interval-bytes")
|
||||||
|
.nick("Flush interval in bytes")
|
||||||
|
.blurb("Number of bytes to accumulate before doing a write (0 => disable)")
|
||||||
|
.default_value(DEFAULT_FLUSH_INTERVAL_BYTES)
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt64::builder("flush-interval-time")
|
||||||
|
.nick("Flush interval in duration")
|
||||||
|
.blurb("Total duration of buffers to accumulate before doing a write (0 => disable)")
|
||||||
|
.default_value(DEFAULT_FLUSH_INTERVAL_TIME.nseconds())
|
||||||
|
.build(),
|
||||||
]
|
]
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -451,6 +516,18 @@ impl ObjectImpl for S3PutObjectSink {
|
||||||
.get::<Option<String>>()
|
.get::<Option<String>>()
|
||||||
.expect("type checked upstream");
|
.expect("type checked upstream");
|
||||||
}
|
}
|
||||||
|
"flush-interval-buffers" => {
|
||||||
|
settings.flush_interval_buffers =
|
||||||
|
value.get::<u64>().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"flush-interval-bytes" => {
|
||||||
|
settings.flush_interval_bytes = value.get::<u64>().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"flush-interval-time" => {
|
||||||
|
settings.flush_interval_time = value
|
||||||
|
.get::<Option<gst::ClockTime>>()
|
||||||
|
.expect("type checked upstream");
|
||||||
|
}
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -480,6 +557,9 @@ impl ObjectImpl for S3PutObjectSink {
|
||||||
"endpoint-uri" => settings.endpoint_uri.to_value(),
|
"endpoint-uri" => settings.endpoint_uri.to_value(),
|
||||||
"content-type" => settings.content_type.to_value(),
|
"content-type" => settings.content_type.to_value(),
|
||||||
"content-disposition" => settings.content_disposition.to_value(),
|
"content-disposition" => settings.content_disposition.to_value(),
|
||||||
|
"flush-interval-buffers" => settings.flush_interval_buffers.to_value(),
|
||||||
|
"flush-interval-bytes" => settings.flush_interval_bytes.to_value(),
|
||||||
|
"flush-interval-time" => settings.flush_interval_time.to_value(),
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -544,6 +624,12 @@ impl BaseSinkImpl for S3PutObjectSink {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if started_state.start_pts.is_none() {
|
||||||
|
started_state.start_pts = buffer.pts();
|
||||||
|
}
|
||||||
|
|
||||||
|
started_state.num_buffers += 1;
|
||||||
|
|
||||||
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
|
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
|
||||||
let map = buffer.map_readable().map_err(|_| {
|
let map = buffer.map_readable().map_err(|_| {
|
||||||
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
|
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
|
||||||
|
@ -551,6 +637,11 @@ impl BaseSinkImpl for S3PutObjectSink {
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
started_state.buffer.extend_from_slice(map.as_slice());
|
started_state.buffer.extend_from_slice(map.as_slice());
|
||||||
|
|
||||||
|
if !self.check_thresholds(started_state, buffer.pts(), buffer.duration()) {
|
||||||
|
return Ok(gst::FlowSuccess::Ok);
|
||||||
|
}
|
||||||
|
|
||||||
drop(state);
|
drop(state);
|
||||||
|
|
||||||
match self.flush_buffer() {
|
match self.flush_buffer() {
|
||||||
|
|
|
@ -28,6 +28,12 @@ mod tests {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn make_buffer(content: &[u8]) -> gst::Buffer {
|
||||||
|
let mut buf = gst::Buffer::from_slice(content.to_owned());
|
||||||
|
buf.make_mut().set_pts(gst::ClockTime::from_mseconds(200));
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
|
||||||
async fn delete_object(region: String, bucket: &str, key: &str) {
|
async fn delete_object(region: String, bucket: &str, key: &str) {
|
||||||
let region_provider = aws_config::meta::region::RegionProviderChain::first_try(
|
let region_provider = aws_config::meta::region::RegionProviderChain::first_try(
|
||||||
aws_sdk_s3::config::Region::new(region),
|
aws_sdk_s3::config::Region::new(region),
|
||||||
|
@ -70,11 +76,11 @@ mod tests {
|
||||||
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
||||||
h1.play();
|
h1.play();
|
||||||
|
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push_event(gst::event::Eos::new());
|
h1.push_event(gst::event::Eos::new());
|
||||||
|
|
||||||
let mut h2 = gst_check::Harness::new("awss3src");
|
let mut h2 = gst_check::Harness::new("awss3src");
|
||||||
|
@ -91,7 +97,12 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Common helper
|
// Common helper
|
||||||
async fn do_s3_putobject_test(key_prefix: &str) {
|
async fn do_s3_putobject_test(
|
||||||
|
key_prefix: &str,
|
||||||
|
buffers: Option<u64>,
|
||||||
|
bytes: Option<u64>,
|
||||||
|
time: Option<gst::ClockTime>,
|
||||||
|
) {
|
||||||
init();
|
init();
|
||||||
|
|
||||||
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
|
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
|
||||||
|
@ -103,22 +114,40 @@ mod tests {
|
||||||
|
|
||||||
// Manually add the element so we can configure it before it goes to PLAYING
|
// Manually add the element so we can configure it before it goes to PLAYING
|
||||||
let mut h1 = gst_check::Harness::new_empty();
|
let mut h1 = gst_check::Harness::new_empty();
|
||||||
|
|
||||||
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
|
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
|
||||||
// adding an element manually
|
// adding an element manually
|
||||||
|
|
||||||
h1.add_parse(
|
h1.add_parse(
|
||||||
format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\"")
|
format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\" name=\"sink\"")
|
||||||
.as_str(),
|
.as_str(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let h1el = h1
|
||||||
|
.element()
|
||||||
|
.unwrap()
|
||||||
|
.dynamic_cast::<gst::Bin>()
|
||||||
|
.unwrap()
|
||||||
|
.by_name("sink")
|
||||||
|
.unwrap();
|
||||||
|
if let Some(b) = buffers {
|
||||||
|
h1el.set_property("flush-interval-buffers", b)
|
||||||
|
};
|
||||||
|
if let Some(b) = bytes {
|
||||||
|
h1el.set_property("flush-interval-bytes", b)
|
||||||
|
};
|
||||||
|
if time.is_some() {
|
||||||
|
h1el.set_property("flush-interval-time", time)
|
||||||
|
};
|
||||||
|
|
||||||
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
||||||
h1.play();
|
h1.play();
|
||||||
|
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push(gst::Buffer::from_slice(content)).unwrap();
|
h1.push(make_buffer(content)).unwrap();
|
||||||
h1.push_event(gst::event::Eos::new());
|
h1.push_event(gst::event::Eos::new());
|
||||||
|
|
||||||
let mut h2 = gst_check::Harness::new("awss3src");
|
let mut h2 = gst_check::Harness::new("awss3src");
|
||||||
|
@ -151,16 +180,40 @@ mod tests {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_s3_put_object_simple() {
|
async fn test_s3_put_object_simple() {
|
||||||
do_s3_putobject_test("s3-put-object-test").await;
|
do_s3_putobject_test("s3-put-object-test", None, None, None).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_s3_put_object_whitespace() {
|
async fn test_s3_put_object_whitespace() {
|
||||||
do_s3_putobject_test("s3 put object test").await;
|
do_s3_putobject_test("s3 put object test", None, None, None).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_s3_put_object_unicode() {
|
async fn test_s3_put_object_unicode() {
|
||||||
do_s3_putobject_test("s3 put object 🧪 😱").await;
|
do_s3_putobject_test("s3 put object 🧪 😱", None, None, None).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_s3_put_object_flush_buffers() {
|
||||||
|
// Awkward threshold as we push 5 buffers
|
||||||
|
do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_s3_put_object_flush_bytes() {
|
||||||
|
// Awkward threshold as we push 14 bytes per buffer
|
||||||
|
do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_s3_put_object_flush_time() {
|
||||||
|
do_s3_putobject_test(
|
||||||
|
"s3-put-object-test ftime",
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
// Awkward threshold as we push each buffer with 200ms
|
||||||
|
Some(gst::ClockTime::from_mseconds(300)),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue