diff --git a/net/aws/src/s3hlssink/imp.rs b/net/aws/src/s3hlssink/imp.rs index e4f47d9d5..138e0f384 100644 --- a/net/aws/src/s3hlssink/imp.rs +++ b/net/aws/src/s3hlssink/imp.rs @@ -108,6 +108,7 @@ struct S3Upload { s3_acl: ObjectCannedAcl, s3_tx: SyncSender, s3_data: Vec, + last_flush_len: usize, } struct S3UploadReq { @@ -170,26 +171,28 @@ impl S3Upload { s3_acl, s3_data: Vec::new(), s3_tx, + last_flush_len: 0, } } -} -impl Write for S3Upload { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - gst::log!(CAT, "Write {}, {}", self.s3_key, buf.len()); - self.s3_data.extend_from_slice(buf); - Ok(buf.len()) - } + fn flush(&mut self, consume: bool) { + let s3_data_len = self.s3_data.len(); + if self.last_flush_len == s3_data_len { + // Nothing new was written, nothing to flush + gst::debug!( + CAT, + "Buffer length ({}) unchanged, skipping flush for {}", + s3_data_len, + self.s3_key + ); + return; + } - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -impl Drop for S3Upload { - fn drop(&mut self) { - let s3_data: Vec = self.s3_data.drain(0..).collect(); - let s3_data_len = s3_data.len(); + let s3_data: Vec = if consume { + self.s3_data.drain(0..).collect() + } else { + self.s3_data.clone() + }; let s3_tx = &mut self.s3_tx; let s3_channel = S3UploadReq { s3_client: self.s3_client.clone(), @@ -201,12 +204,14 @@ impl Drop for S3Upload { gst::debug!( CAT, - "Submitting upload request for key: {}", + "Submitting {}consuming upload request for key: {}", + if consume { "" } else { "non-" }, s3_channel.s3_key, ); match s3_tx.send(S3Request::Upload(s3_channel)) { Ok(()) => { + self.last_flush_len = s3_data_len; gst::debug!( CAT, "Send S3 key {} of data length {} succeeded", @@ -231,6 +236,25 @@ impl Drop for S3Upload { } } +impl Write for S3Upload { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + gst::log!(CAT, "Write {}, {}", self.s3_key, buf.len()); + self.s3_data.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.flush(false); + Ok(()) + } +} + +impl Drop for S3Upload { + fn drop(&mut self) { + self.flush(true); + } +} + impl S3HlsSink { fn s3_request(&self, rxc: Receiver, rx: Receiver) { loop {