gst-plugins-rs/net/aws/tests/s3.rs
Arun Raghavan 8f96509f03 aws: s3: Enable tests again
We lost the environment variable checks during the addition of the
putobjectsink tests, which caused failures on MR branches.

It would be nicer to use some other mechanism to validate the tests can
run, so we don't count on only the environmnent, but for now this will
have to do.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1629>
2024-06-18 11:58:43 -04:00

283 lines
9.2 KiB
Rust

// Copyright (C) 2022, Asymptotic Inc.
// Author: Arun Raghavan <arun@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
//
// Note: these tests need valid AWS credentials to run. To avoid failures on CI, we test for the
// existence of AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables and skip the test
// if those don't exist. That means that in local testing, other methods of providing credentials
// (such as ~/.aws/credentials) will not work.
// The test times out on Windows for some reason, skip until we figure out why
#[cfg(not(target_os = "windows"))]
#[cfg(test)]
mod tests {
use gst::prelude::*;
use gstaws::s3utils::{AWS_BEHAVIOR_VERSION, DEFAULT_S3_REGION};
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstaws::plugin_register_static().unwrap();
// Makes it easier to get AWS SDK logs if needed
env_logger::init();
});
}
fn make_buffer(content: &[u8]) -> gst::Buffer {
let mut buf = gst::Buffer::from_slice(content.to_owned());
buf.make_mut().set_pts(gst::ClockTime::from_mseconds(200));
buf
}
async fn delete_object(region: String, bucket: &str, key: &str) {
let region_provider = aws_config::meta::region::RegionProviderChain::first_try(
aws_sdk_s3::config::Region::new(region),
)
.or_default_provider();
let config = aws_config::defaults(AWS_BEHAVIOR_VERSION.clone())
.region(region_provider)
.load()
.await;
let client = aws_sdk_s3::Client::new(&config);
client
.delete_object()
.bucket(bucket)
.key(key)
.send()
.await
.unwrap();
}
// Common helper
async fn do_s3_multipart_test(key_prefix: &str) {
init();
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
let bucket =
std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
let uri = format!("s3://{region}/{bucket}/{key}");
let content = "Hello, world!\n".as_bytes();
// Manually add the element so we can configure it before it goes to PLAYING
let mut h1 = gst_check::Harness::new_empty();
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
// adding an element manually
h1.add_parse(format!("awss3sink uri=\"{uri}\"").as_str());
h1.set_src_caps(gst::Caps::builder("text/plain").build());
h1.play();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push_event(gst::event::Eos::new());
let mut h2 = gst_check::Harness::new("awss3src");
h2.element().unwrap().set_property("uri", uri.clone());
h2.play();
let buf = h2.pull_until_eos().unwrap().unwrap();
assert_eq!(
content.repeat(5),
buf.into_mapped_buffer_readable().unwrap().as_slice()
);
delete_object(region.clone(), &bucket, &key).await;
}
// Common helper
async fn do_s3_putobject_test(
key_prefix: &str,
buffers: Option<u64>,
bytes: Option<u64>,
time: Option<gst::ClockTime>,
do_eos: bool,
) {
init();
let region = std::env::var("AWS_REGION").unwrap_or_else(|_| DEFAULT_S3_REGION.to_string());
let bucket =
std::env::var("AWS_S3_BUCKET").unwrap_or_else(|_| "gst-plugins-rs-tests".to_string());
let key = format!("{key_prefix}-{:?}.txt", chrono::Utc::now());
let uri = format!("s3://{region}/{bucket}/{key}");
let content = "Hello, world!\n".as_bytes();
// Manually add the element so we can configure it before it goes to PLAYING
let mut h1 = gst_check::Harness::new_empty();
// Need to add_parse() because the Harness API / Rust bindings aren't conducive to creating and
// adding an element manually
h1.add_parse(
format!("awss3putobjectsink key=\"{key}\" region=\"{region}\" bucket=\"{bucket}\" name=\"sink\"")
.as_str(),
);
let h1el = h1
.element()
.unwrap()
.dynamic_cast::<gst::Bin>()
.unwrap()
.by_name("sink")
.unwrap();
if let Some(b) = buffers {
h1el.set_property("flush-interval-buffers", b)
};
if let Some(b) = bytes {
h1el.set_property("flush-interval-bytes", b)
};
if time.is_some() {
h1el.set_property("flush-interval-time", time)
};
if !do_eos {
h1el.set_property("flush-on-error", true)
}
h1.set_src_caps(gst::Caps::builder("text/plain").build());
h1.play();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
h1.push(make_buffer(content)).unwrap();
if do_eos {
h1.push_event(gst::event::Eos::new());
} else {
// teardown to trigger end
drop(h1);
}
let mut h2 = gst_check::Harness::new("awss3src");
h2.element().unwrap().set_property("uri", uri.clone());
h2.play();
let buf = h2.pull_until_eos().unwrap().unwrap();
assert_eq!(
content.repeat(5),
buf.into_mapped_buffer_readable().unwrap().as_slice()
);
delete_object(region.clone(), &bucket, &key).await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_multipart_simple() {
do_s3_multipart_test("s3-test").await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_multipart_whitespace() {
do_s3_multipart_test("s3 test").await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_multipart_unicode() {
do_s3_multipart_test("s3 🧪 😱").await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_put_object_simple() {
do_s3_putobject_test("s3-put-object-test", None, None, None, true).await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_put_object_whitespace() {
do_s3_putobject_test("s3 put object test", None, None, None, true).await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_put_object_unicode() {
do_s3_putobject_test("s3 put object 🧪 😱", None, None, None, true).await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_put_object_flush_buffers() {
// Awkward threshold as we push 5 buffers
do_s3_putobject_test("s3-put-object-test fbuf", Some(2), None, None, true).await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_put_object_flush_bytes() {
// Awkward threshold as we push 14 bytes per buffer
do_s3_putobject_test("s3-put-object-test fbytes", None, Some(30), None, true).await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_put_object_flush_time() {
do_s3_putobject_test(
"s3-put-object-test ftime",
None,
None,
// Awkward threshold as we push each buffer with 200ms
Some(gst::ClockTime::from_mseconds(300)),
true,
)
.await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_put_object_on_eos() {
// Disable all flush thresholds, so only EOS causes a flush
do_s3_putobject_test(
"s3-put-object-test eos",
Some(0),
Some(0),
Some(gst::ClockTime::from_nseconds(0)),
true,
)
.await;
}
#[test_with::env(AWS_ACCESS_KEY_ID)]
#[test_with::env(AWS_SECRET_ACCESS_KEY)]
#[tokio::test]
async fn test_s3_put_object_without_eos() {
// Disable all flush thresholds, skip EOS, and cause a flush on error
do_s3_putobject_test(
"s3-put-object-test !eos",
Some(0),
Some(0),
Some(gst::ClockTime::from_nseconds(0)),
false,
)
.await;
}
}