From efaab53ab3640a2fa22c59b48b9c57da53e7bb20 Mon Sep 17 00:00:00 2001 From: Arun Raghavan Date: Mon, 30 Jun 2025 10:34:07 -0400 Subject: [PATCH] aws: s3hlssink: Write to S3 on OutputStream flush It is quite brittle to try to depend on signal order from splitmuxsink or hlssink3 (or worse, when S3Upload gets drop()ed). We implement flush() on the stream instead, with a simple check to make sure there's new data to avoid duplicate uploads. If we get the ordering wrong, we upload new segments _after_ the corresponding playlist update referencing that segment, which outside of being wrong, could break players. I had done this in the initial implementation, but iirc the flush() was insufficient for reasons I don't fully recall. Doing a flush in both flush() and drop() should hopefully cover all the cases that I found to not work. In addition, since this is an append-only stream, we can do a simple length check to avoid redundant uploads. Fixes: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/701 Part-of: --- net/aws/src/s3hlssink/imp.rs | 58 +++++++++++++++++++++++++----------- 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/net/aws/src/s3hlssink/imp.rs b/net/aws/src/s3hlssink/imp.rs index e4f47d9d5..138e0f384 100644 --- a/net/aws/src/s3hlssink/imp.rs +++ b/net/aws/src/s3hlssink/imp.rs @@ -108,6 +108,7 @@ struct S3Upload { s3_acl: ObjectCannedAcl, s3_tx: SyncSender, s3_data: Vec, + last_flush_len: usize, } struct S3UploadReq { @@ -170,26 +171,28 @@ impl S3Upload { s3_acl, s3_data: Vec::new(), s3_tx, + last_flush_len: 0, } } -} -impl Write for S3Upload { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - gst::log!(CAT, "Write {}, {}", self.s3_key, buf.len()); - self.s3_data.extend_from_slice(buf); - Ok(buf.len()) - } + fn flush(&mut self, consume: bool) { + let s3_data_len = self.s3_data.len(); + if self.last_flush_len == s3_data_len { + // Nothing new was written, nothing to flush + gst::debug!( + CAT, + "Buffer length ({}) unchanged, skipping flush for {}", + s3_data_len, + self.s3_key + ); + return; + } - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -impl Drop for S3Upload { - fn drop(&mut self) { - let s3_data: Vec = self.s3_data.drain(0..).collect(); - let s3_data_len = s3_data.len(); + let s3_data: Vec = if consume { + self.s3_data.drain(0..).collect() + } else { + self.s3_data.clone() + }; let s3_tx = &mut self.s3_tx; let s3_channel = S3UploadReq { s3_client: self.s3_client.clone(), @@ -201,12 +204,14 @@ impl Drop for S3Upload { gst::debug!( CAT, - "Submitting upload request for key: {}", + "Submitting {}consuming upload request for key: {}", + if consume { "" } else { "non-" }, s3_channel.s3_key, ); match s3_tx.send(S3Request::Upload(s3_channel)) { Ok(()) => { + self.last_flush_len = s3_data_len; gst::debug!( CAT, "Send S3 key {} of data length {} succeeded", @@ -231,6 +236,25 @@ impl Drop for S3Upload { } } +impl Write for S3Upload { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + gst::log!(CAT, "Write {}, {}", self.s3_key, buf.len()); + self.s3_data.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.flush(false); + Ok(()) + } +} + +impl Drop for S3Upload { + fn drop(&mut self) { + self.flush(true); + } +} + impl S3HlsSink { fn s3_request(&self, rxc: Receiver, rx: Receiver) { loop {