From 939cd7955ccc05f38682e4ca1d1f3774d5f8872b Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 8 Feb 2025 14:21:52 -0600 Subject: [PATCH] Add BytesReader for streaming image decoding for blurhashes --- src/blurhash.rs | 29 +-- src/bytes_reader.rs | 511 ++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 3 files changed, 528 insertions(+), 13 deletions(-) create mode 100644 src/bytes_reader.rs diff --git a/src/blurhash.rs b/src/blurhash.rs index ec6570d..42ee569 100644 --- a/src/blurhash.rs +++ b/src/blurhash.rs @@ -3,10 +3,11 @@ use std::{ time::Duration, }; +use streem::IntoStreamer; use tokio::io::AsyncReadExt; use crate::{ - bytes_stream::BytesStream, + bytes_reader::BytesReader, details::Details, error::{Error, UploadError}, formats::ProcessableFormat, @@ -73,16 +74,10 @@ where | format @ image::ImageFormat::Png | format @ image::ImageFormat::WebP, ) => { - let bytes_stream = BytesStream::try_from_stream(stream).await?; + let (tx, bytes_reader) = BytesReader::new(8); - let blurhash = crate::sync::spawn_blocking("image-blurhash", move || { - let mut vec = Vec::with_capacity(bytes_stream.len()); - - for bytes in bytes_stream { - vec.extend(bytes); - } - - let raw_image = image::ImageReader::with_format(std::io::Cursor::new(vec), format) + let blurhash_task = crate::sync::spawn_blocking("image-blurhash", move || { + let raw_image = image::ImageReader::with_format(bytes_reader, format) .decode() .map_err(UploadError::Decode)? .into_rgba8(); @@ -96,9 +91,17 @@ where ); Ok(blurhash) as Result<_, UploadError> - }) - .await - .map_err(|_| UploadError::Canceled)??; + }); + + let stream = std::pin::pin!(stream); + let mut streamer = stream.into_streamer(); + + while let Some(res) = streamer.next().await { + tx.send(res).await.map_err(|_| UploadError::Canceled)?; + } + drop(tx); + + let blurhash = blurhash_task.await.map_err(|_| UploadError::Canceled)??; Ok(blurhash) } diff --git a/src/bytes_reader.rs b/src/bytes_reader.rs new file mode 100644 index 0000000..51897b5 --- /dev/null +++ b/src/bytes_reader.rs @@ -0,0 +1,511 @@ +use actix_web::web::Bytes; + +pub(crate) struct BytesReader { + rx: tokio::sync::mpsc::Receiver>, + buf: Vec, + position: usize, + len: usize, +} + +impl BytesReader { + pub(crate) fn new(buffer: usize) -> (tokio::sync::mpsc::Sender>, Self) { + let (tx, rx) = tokio::sync::mpsc::channel(buffer); + + ( + tx, + Self { + rx, + buf: Vec::new(), + position: 0, + len: 0, + }, + ) + } +} + +impl BytesReader { + fn try_recv(&mut self) -> std::io::Result<()> { + while let Ok(res) = self.rx.try_recv() { + let bytes = res?; + self.len += bytes.len(); + self.buf.push(bytes); + } + + Ok(()) + } + + fn blocking_recv(&mut self) -> std::io::Result { + if let Some(bytes) = self.rx.blocking_recv().transpose()? { + self.len += bytes.len(); + self.buf.push(bytes); + Ok(true) + } else { + Ok(false) + } + } +} + +impl std::io::BufRead for BytesReader { + fn fill_buf(&mut self) -> std::io::Result<&[u8]> { + self.try_recv()?; + + loop { + let mut position = self.position; + + let mut index = None; + + for (i, bytes) in self.buf.iter().enumerate() { + if position < bytes.len() { + index = Some(i); + break; + } else { + position -= bytes.len(); + } + } + + if let Some(i) = index { + return Ok(&self.buf[i][position..]); + } else if !self.blocking_recv()? { + return Ok(&[]); + } + } + } + + fn consume(&mut self, amt: usize) { + self.position += amt; + } +} + +impl std::io::Read for BytesReader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.try_recv()?; + + let mut written = 0; + + loop { + let mut position = self.position; + + for bytes in &self.buf { + if position < bytes.len() { + let copy_len = (bytes.len() - position).min(buf.len() - written); + + buf[written..written + copy_len] + .copy_from_slice(&bytes[position..position + copy_len]); + position = 0; + self.position += copy_len; + written += copy_len; + + if written == buf.len() { + return Ok(written); + } + } else { + position -= bytes.len(); + } + } + + if !self.blocking_recv()? { + return Ok(written); + } + } + } +} + +impl std::io::Seek for BytesReader { + fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result { + match pos { + std::io::SeekFrom::Start(pos) => { + // any positive seek is valid, seeks past the current length + // will cause blocking during reads and Ok(0) for past-end reads + self.position = pos as usize; + } + std::io::SeekFrom::End(pos) => { + // read all bytes into memory for end-seeking + while self.blocking_recv()? {} + + if pos < 0 && pos.unsigned_abs() as usize > self.len { + todo!("before-zero seek") + } + + self.position = (self.len as i64 + pos) as usize; + } + std::io::SeekFrom::Current(pos) => { + if pos < 0 && pos.unsigned_abs() as usize > self.position { + todo!("before-zero seek") + } + + self.position = (self.position as i64 + pos) as usize; + } + } + + Ok(self.position as u64) + } +} + +#[test] +fn read_existing_bytes() { + use std::io::Read; + + let (tx, mut reader) = BytesReader::new(8); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + + let mut buf = [0u8; 16]; + + let n = reader.read(&mut buf).unwrap(); + + assert_eq!(n, 16); + assert_eq!(&buf, b"hellohowdyhewwoh"); +} + +#[test] +fn buf_read_existing_bytes() { + use std::io::BufRead; + + let (tx, mut reader) = BytesReader::new(8); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + + drop(tx); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"hello", "hello"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"howdy", "howdy"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"hewwo", "hewwo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"henlo", "henlo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf.len(), 0); +} + +#[test] +fn read_pending_bytes() { + use std::io::Read; + + let (tx, mut reader) = BytesReader::new(8); + + let join_handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + }); + + let mut buf = [0u8; 16]; + + let n = reader.read(&mut buf).unwrap(); + + assert_eq!(n, 16); + assert_eq!(&buf, b"hellohowdyhewwoh"); + + join_handle.join().unwrap(); +} + +#[test] +fn buf_read_pending_bytes() { + use std::io::BufRead; + + let (tx, mut reader) = BytesReader::new(8); + + let join_handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + }); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"hello", "hello"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"howdy", "howdy"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"hewwo", "hewwo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"henlo", "henlo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf.len(), 0); + + join_handle.join().unwrap(); +} + +#[test] +fn seek_and_read_existing_bytes() { + use std::io::Read; + use std::io::Seek; + + let (tx, mut reader) = BytesReader::new(8); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + + let mut buf = [0u8; 16]; + + reader.seek(std::io::SeekFrom::Start(4)).unwrap(); + let n = reader.read(&mut buf).unwrap(); + + assert_eq!(n, 16); + assert_eq!(&buf, b"ohowdyhewwohenlo"); +} + +#[test] +fn seek_and_buf_read_existing_bytes() { + use std::io::BufRead; + use std::io::Seek; + + let (tx, mut reader) = BytesReader::new(8); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + drop(tx); + + reader.seek(std::io::SeekFrom::Start(4)).unwrap(); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"o", "o"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"howdy", "howdy"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"hewwo", "hewwo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"henlo", "henlo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf.len(), 0); +} + +#[test] +fn seek_and_read_pending_bytes() { + use std::io::Read; + use std::io::Seek; + + let (tx, mut reader) = BytesReader::new(8); + + let join_handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + }); + + let mut buf = [0u8; 16]; + + reader.seek(std::io::SeekFrom::Start(4)).unwrap(); + let n = reader.read(&mut buf).unwrap(); + + assert_eq!(n, 16); + assert_eq!(&buf, b"ohowdyhewwohenlo"); + + join_handle.join().unwrap(); +} + +#[test] +fn seek_and_buf_read_pending_bytes() { + use std::io::BufRead; + use std::io::Seek; + + let (tx, mut reader) = BytesReader::new(8); + + let join_handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + }); + + reader.seek(std::io::SeekFrom::Start(4)).unwrap(); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"o", "o"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"howdy", "howdy"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"hewwo", "hewwo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"henlo", "henlo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf.len(), 0); + + join_handle.join().unwrap(); +} + +#[test] +fn seek_and_read_fewer_pending_bytes() { + use std::io::Read; + use std::io::Seek; + + let (tx, mut reader) = BytesReader::new(8); + + let join_handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + }); + + let mut buf = [0u8; 16]; + + reader.seek(std::io::SeekFrom::Start(8)).unwrap(); + let n = reader.read(&mut buf).unwrap(); + + assert_eq!(n, 12); + assert_eq!(&buf[..n], b"dyhewwohenlo"); + + join_handle.join().unwrap(); +} + +#[test] +fn seek_and_buf_read_fewer_pending_bytes() { + use std::io::BufRead; + use std::io::Seek; + + let (tx, mut reader) = BytesReader::new(8); + + let join_handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + }); + + reader.seek(std::io::SeekFrom::Start(8)).unwrap(); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"dy", "dy"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"hewwo", "hewwo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf, b"henlo", "henlo"); + let n = buf.len(); + reader.consume(n); + + let buf = reader.fill_buf().unwrap(); + assert_eq!(buf.len(), 0); + + join_handle.join().unwrap(); +} + +#[test] +fn seek_past_end_pending_bytes() { + use std::io::Read; + use std::io::Seek; + + let (tx, mut reader) = BytesReader::new(8); + + let join_handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + }); + + let mut buf = [0u8; 16]; + + reader.seek(std::io::SeekFrom::Start(30)).unwrap(); + let n = reader.read(&mut buf).unwrap(); + + assert_eq!(n, 0); + + join_handle.join().unwrap(); +} + +#[test] +fn seek_past_end_buf_pending_bytes() { + use std::io::BufRead; + use std::io::Seek; + + let (tx, mut reader) = BytesReader::new(8); + + let join_handle = std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_secs(1)); + + tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap(); + tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap(); + }); + + reader.seek(std::io::SeekFrom::Start(30)).unwrap(); + let buf = reader.fill_buf().unwrap(); + + assert_eq!(buf, b""); + + join_handle.join().unwrap(); +} diff --git a/src/lib.rs b/src/lib.rs index 5450836..a797c84 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ mod backgrounded; mod blurhash; +mod bytes_reader; mod bytes_stream; mod config; mod details;