aws: s3hlssink: Write to S3 on OutputStream flush

It is quite brittle to try to depend on signal order from splitmuxsink
or hlssink3 (or worse, when S3Upload gets drop()ed). We implement
flush() on the stream instead, with a simple check to make sure there's
new data to avoid duplicate uploads.

If we get the ordering wrong, we upload new segments _after_ the
corresponding playlist update referencing that segment, which outside of
being wrong, could break players.

I had done this in the initial implementation, but iirc the flush() was
insufficient for reasons I don't fully recall. Doing a flush in both
flush() and drop() should hopefully cover all the cases that I found to
not work. In addition, since this is an append-only stream, we can do a
simple length check to avoid redundant uploads.

Fixes: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/701
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2330>
This commit is contained in:
Arun Raghavan 2025-06-30 10:34:07 -04:00 committed by GStreamer Marge Bot
parent b271edc3f8
commit efaab53ab3

View file

@ -108,6 +108,7 @@ struct S3Upload {
s3_acl: ObjectCannedAcl, s3_acl: ObjectCannedAcl,
s3_tx: SyncSender<S3Request>, s3_tx: SyncSender<S3Request>,
s3_data: Vec<u8>, s3_data: Vec<u8>,
last_flush_len: usize,
} }
struct S3UploadReq { struct S3UploadReq {
@ -170,26 +171,28 @@ impl S3Upload {
s3_acl, s3_acl,
s3_data: Vec::new(), s3_data: Vec::new(),
s3_tx, s3_tx,
last_flush_len: 0,
} }
} }
}
impl Write for S3Upload { fn flush(&mut self, consume: bool) {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> { let s3_data_len = self.s3_data.len();
gst::log!(CAT, "Write {}, {}", self.s3_key, buf.len()); if self.last_flush_len == s3_data_len {
self.s3_data.extend_from_slice(buf); // Nothing new was written, nothing to flush
Ok(buf.len()) gst::debug!(
} CAT,
"Buffer length ({}) unchanged, skipping flush for {}",
s3_data_len,
self.s3_key
);
return;
}
fn flush(&mut self) -> std::io::Result<()> { let s3_data: Vec<u8> = if consume {
Ok(()) self.s3_data.drain(0..).collect()
} } else {
} self.s3_data.clone()
};
impl Drop for S3Upload {
fn drop(&mut self) {
let s3_data: Vec<u8> = self.s3_data.drain(0..).collect();
let s3_data_len = s3_data.len();
let s3_tx = &mut self.s3_tx; let s3_tx = &mut self.s3_tx;
let s3_channel = S3UploadReq { let s3_channel = S3UploadReq {
s3_client: self.s3_client.clone(), s3_client: self.s3_client.clone(),
@ -201,12 +204,14 @@ impl Drop for S3Upload {
gst::debug!( gst::debug!(
CAT, CAT,
"Submitting upload request for key: {}", "Submitting {}consuming upload request for key: {}",
if consume { "" } else { "non-" },
s3_channel.s3_key, s3_channel.s3_key,
); );
match s3_tx.send(S3Request::Upload(s3_channel)) { match s3_tx.send(S3Request::Upload(s3_channel)) {
Ok(()) => { Ok(()) => {
self.last_flush_len = s3_data_len;
gst::debug!( gst::debug!(
CAT, CAT,
"Send S3 key {} of data length {} succeeded", "Send S3 key {} of data length {} succeeded",
@ -231,6 +236,25 @@ impl Drop for S3Upload {
} }
} }
impl Write for S3Upload {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
gst::log!(CAT, "Write {}, {}", self.s3_key, buf.len());
self.s3_data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
self.flush(false);
Ok(())
}
}
impl Drop for S3Upload {
fn drop(&mut self) {
self.flush(true);
}
}
impl S3HlsSink { impl S3HlsSink {
fn s3_request(&self, rxc: Receiver<S3RequestControl>, rx: Receiver<S3Request>) { fn s3_request(&self, rxc: Receiver<S3RequestControl>, rx: Receiver<S3Request>) {
loop { loop {