gst-plugins-rs/net/aws/tests/s3.rs
François Laignel 9f27bde36a aws: use fixed BehaviorVersion
Quoting [`BehaviorVersion` documentation]:

> Over time, new best-practice behaviors are introduced. However, these
> behaviors might not be backwards compatible. For example, a change which
> introduces new default timeouts or a new retry-mode for all operations might
> be the ideal behavior but could break existing applications.

This commit uses `BehaviorVersion::v2023_11_09()`, which is the latest
major version at the moment. When a new major version is released, the method
will be deprecated, which will warn us of the new version and let us decide
when to upgrade, after any changes if required. This is safer that using
`latest()` which would silently use a different major version, possibly
breaking existing code.

[`BehaviorVersion` documentation]: https://docs.rs/aws-config/1.1.8/aws_config/struct.BehaviorVersion.html

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1520>
2024-03-26 17:44:16 +01:00

267 lines
8.5 KiB
Rust

// Copyright (C) 2022, Asymptotic Inc.
// Author: Arun Raghavan <arun@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
//
// The test times out on Windows for some reason, skip until we figure out why
#[cfg(not(target_os = "windows"))]
#[cfg(test)]
mod tests {
use gst::prelude::*;
use gstaws::s3utils::{AWS_BEHAVIOR_VERSION, DEFAULT_S3_REGION};
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstaws::plugin_register_static().unwrap();
// Makes it easier to get AWS SDK logs if needed
env_logger::init();
});
}
fn make_buffer(content: &[u8]) -> gst::Buffer {
let mut buf = gst::Buffer::from_slice(content.to_owned());
buf.make_mut().set_pts(gst::ClockTime::from_mseconds(200));
buf
}
async fn delete_object(region: String, bucket: &str, key: &str) {
let region_provider = aws_config::meta::region::RegionProviderChain::first_try(
aws_sdk_s3::config::Region::new(region),
)
.or_default_provider();
let config = aws_config::defaults(AWS_BEHAVIOR_VERSION.clone())
.region(region_provider)
.load()
.await;
let client = aws_sdk_s3::Client::new(&config);
client
.delete_object()
.bucket(bucket)
.key(key)
.send()
.await
.unwrap();
}
// Common helper
async fn do_s3_multipart_test(key_prefix: &str) {
init();
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
let bucket =
std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
let uri = format!("s3://{region}/{bucket}/{key}");
let content = "Hello, world!\n".as_bytes();
// Manually add the element so we can configure it before it goes to PLAYING
let mut h1 = gst_check::Harness::new_empty();
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
// adding an element manually
h1.add_parse(format!("awss3sink uri=\"{uri}\"").as_str());
h1.set_src_caps(gst::Caps::builder("text/plain").build());
h1.play();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push_event(gst::event::Eos::new());
let mut h2 = gst_check::Harness::new("awss3src");
h2.element().unwrap().set_property("uri", uri.clone());
h2.play();
let buf = h2.pull_until_eos().unwrap().unwrap();
assert_eq!(
content.repeat(5),
buf.into_mapped_buffer_readable().unwrap().as_slice()
);
delete_object(region.clone(), &bucket, &key).await;
}
// Common helper
async fn do_s3_putobject_test(
key_prefix: &str,
buffers: Option<u64>,
bytes: Option<u64>,
time: Option<gst::ClockTime>,
do_eos: bool,
) {
init();
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
let bucket =
std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
let uri = format!("s3://{region}/{bucket}/{key}");
let content = "Hello, world!\n".as_bytes();
// Manually add the element so we can configure it before it goes to PLAYING
let mut h1 = gst_check::Harness::new_empty();
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
// adding an element manually
h1.add_parse(
format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\" name=\"sink\"")
.as_str(),
);
let h1el = h1
.element()
.unwrap()
.dynamic_cast::<gst::Bin>()
.unwrap()
.by_name("sink")
.unwrap();
if let Some(b) = buffers {
h1el.set_property("flush-interval-buffers", b)
};
if let Some(b) = bytes {
h1el.set_property("flush-interval-bytes", b)
};
if time.is_some() {
h1el.set_property("flush-interval-time", time)
};
if !do_eos {
h1el.set_property("flush-on-error", true)
}
h1.set_src_caps(gst::Caps::builder("text/plain").build());
h1.play();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
if do_eos {
h1.push_event(gst::event::Eos::new());
} else {
// teardown to trigger end
drop(h1);
}
let mut h2 = gst_check::Harness::new("awss3src");
h2.element().unwrap().set_property("uri", uri.clone());
h2.play();
let buf = h2.pull_until_eos().unwrap().unwrap();
assert_eq!(
content.repeat(5),
buf.into_mapped_buffer_readable().unwrap().as_slice()
);
delete_object(region.clone(), &bucket, &key).await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_multipart_simple() {
do_s3_multipart_test("s3-test").await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_multipart_whitespace() {
do_s3_multipart_test("s3 test").await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_multipart_unicode() {
do_s3_multipart_test("s3 🧪 😱").await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_put_object_simple() {
do_s3_putobject_test("s3-put-object-test", None, None, None, true).await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_put_object_whitespace() {
do_s3_putobject_test("s3 put object test", None, None, None, true).await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_put_object_unicode() {
do_s3_putobject_test("s3 put object 🧪 😱", None, None, None, true).await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_put_object_flush_buffers() {
// Awkward threshold as we push 5 buffers
do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None, true).await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_put_object_flush_bytes() {
// Awkward threshold as we push 14 bytes per buffer
do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None, true).await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_put_object_flush_time() {
do_s3_putobject_test(
"s3-put-object-test ftime",
None,
None,
// Awkward threshold as we push each buffer with 200ms
Some(gst::ClockTime::from_mseconds(300)),
true,
)
.await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_put_object_on_eos() {
// Disable all flush thresholds, so only EOS causes a flush
do_s3_putobject_test(
"s3-put-object-test eos",
Some(0),
Some(0),
Some(gst::ClockTime::from_nseconds(0)),
true,
)
.await;
}
#[ignore = "failing, needs investigation"]
#[tokio::test]
async fn test_s3_put_object_without_eos() {
// Disable all flush thresholds, skip EOS, and cause a flush on error
do_s3_putobject_test(
"s3-put-object-test !eos",
Some(0),
Some(0),
Some(gst::ClockTime::from_nseconds(0)),
false,
)
.await;
}
}