Add BytesReader for streaming image decoding for blurhashes

This commit is contained in:
asonix 2025-02-08 14:21:52 -06:00
parent ec9e21f318
commit 939cd7955c
3 changed files with 528 additions and 13 deletions

View file

@ -3,10 +3,11 @@ use std::{
time::Duration, time::Duration,
}; };
use streem::IntoStreamer;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use crate::{ use crate::{
bytes_stream::BytesStream, bytes_reader::BytesReader,
details::Details, details::Details,
error::{Error, UploadError}, error::{Error, UploadError},
formats::ProcessableFormat, formats::ProcessableFormat,
@ -73,16 +74,10 @@ where
| format @ image::ImageFormat::Png | format @ image::ImageFormat::Png
| format @ image::ImageFormat::WebP, | format @ image::ImageFormat::WebP,
) => { ) => {
let bytes_stream = BytesStream::try_from_stream(stream).await?; let (tx, bytes_reader) = BytesReader::new(8);
let blurhash = crate::sync::spawn_blocking("image-blurhash", move || { let blurhash_task = crate::sync::spawn_blocking("image-blurhash", move || {
let mut vec = Vec::with_capacity(bytes_stream.len()); let raw_image = image::ImageReader::with_format(bytes_reader, format)
for bytes in bytes_stream {
vec.extend(bytes);
}
let raw_image = image::ImageReader::with_format(std::io::Cursor::new(vec), format)
.decode() .decode()
.map_err(UploadError::Decode)? .map_err(UploadError::Decode)?
.into_rgba8(); .into_rgba8();
@ -96,9 +91,17 @@ where
); );
Ok(blurhash) as Result<_, UploadError> Ok(blurhash) as Result<_, UploadError>
}) });
.await
.map_err(|_| UploadError::Canceled)??; let stream = std::pin::pin!(stream);
let mut streamer = stream.into_streamer();
while let Some(res) = streamer.next().await {
tx.send(res).await.map_err(|_| UploadError::Canceled)?;
}
drop(tx);
let blurhash = blurhash_task.await.map_err(|_| UploadError::Canceled)??;
Ok(blurhash) Ok(blurhash)
} }

511
src/bytes_reader.rs Normal file
View file

@ -0,0 +1,511 @@
use actix_web::web::Bytes;
pub(crate) struct BytesReader {
rx: tokio::sync::mpsc::Receiver<std::io::Result<Bytes>>,
buf: Vec<Bytes>,
position: usize,
len: usize,
}
impl BytesReader {
pub(crate) fn new(buffer: usize) -> (tokio::sync::mpsc::Sender<std::io::Result<Bytes>>, Self) {
let (tx, rx) = tokio::sync::mpsc::channel(buffer);
(
tx,
Self {
rx,
buf: Vec::new(),
position: 0,
len: 0,
},
)
}
}
impl BytesReader {
fn try_recv(&mut self) -> std::io::Result<()> {
while let Ok(res) = self.rx.try_recv() {
let bytes = res?;
self.len += bytes.len();
self.buf.push(bytes);
}
Ok(())
}
fn blocking_recv(&mut self) -> std::io::Result<bool> {
if let Some(bytes) = self.rx.blocking_recv().transpose()? {
self.len += bytes.len();
self.buf.push(bytes);
Ok(true)
} else {
Ok(false)
}
}
}
impl std::io::BufRead for BytesReader {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
self.try_recv()?;
loop {
let mut position = self.position;
let mut index = None;
for (i, bytes) in self.buf.iter().enumerate() {
if position < bytes.len() {
index = Some(i);
break;
} else {
position -= bytes.len();
}
}
if let Some(i) = index {
return Ok(&self.buf[i][position..]);
} else if !self.blocking_recv()? {
return Ok(&[]);
}
}
}
fn consume(&mut self, amt: usize) {
self.position += amt;
}
}
impl std::io::Read for BytesReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.try_recv()?;
let mut written = 0;
loop {
let mut position = self.position;
for bytes in &self.buf {
if position < bytes.len() {
let copy_len = (bytes.len() - position).min(buf.len() - written);
buf[written..written + copy_len]
.copy_from_slice(&bytes[position..position + copy_len]);
position = 0;
self.position += copy_len;
written += copy_len;
if written == buf.len() {
return Ok(written);
}
} else {
position -= bytes.len();
}
}
if !self.blocking_recv()? {
return Ok(written);
}
}
}
}
impl std::io::Seek for BytesReader {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
match pos {
std::io::SeekFrom::Start(pos) => {
// any positive seek is valid, seeks past the current length
// will cause blocking during reads and Ok(0) for past-end reads
self.position = pos as usize;
}
std::io::SeekFrom::End(pos) => {
// read all bytes into memory for end-seeking
while self.blocking_recv()? {}
if pos < 0 && pos.unsigned_abs() as usize > self.len {
todo!("before-zero seek")
}
self.position = (self.len as i64 + pos) as usize;
}
std::io::SeekFrom::Current(pos) => {
if pos < 0 && pos.unsigned_abs() as usize > self.position {
todo!("before-zero seek")
}
self.position = (self.position as i64 + pos) as usize;
}
}
Ok(self.position as u64)
}
}
#[test]
fn read_existing_bytes() {
use std::io::Read;
let (tx, mut reader) = BytesReader::new(8);
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
let mut buf = [0u8; 16];
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 16);
assert_eq!(&buf, b"hellohowdyhewwoh");
}
#[test]
fn buf_read_existing_bytes() {
use std::io::BufRead;
let (tx, mut reader) = BytesReader::new(8);
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
drop(tx);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"hello", "hello");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"howdy", "howdy");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"hewwo", "hewwo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"henlo", "henlo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf.len(), 0);
}
#[test]
fn read_pending_bytes() {
use std::io::Read;
let (tx, mut reader) = BytesReader::new(8);
let join_handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
});
let mut buf = [0u8; 16];
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 16);
assert_eq!(&buf, b"hellohowdyhewwoh");
join_handle.join().unwrap();
}
#[test]
fn buf_read_pending_bytes() {
use std::io::BufRead;
let (tx, mut reader) = BytesReader::new(8);
let join_handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
});
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"hello", "hello");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"howdy", "howdy");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"hewwo", "hewwo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"henlo", "henlo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf.len(), 0);
join_handle.join().unwrap();
}
#[test]
fn seek_and_read_existing_bytes() {
use std::io::Read;
use std::io::Seek;
let (tx, mut reader) = BytesReader::new(8);
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
let mut buf = [0u8; 16];
reader.seek(std::io::SeekFrom::Start(4)).unwrap();
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 16);
assert_eq!(&buf, b"ohowdyhewwohenlo");
}
#[test]
fn seek_and_buf_read_existing_bytes() {
use std::io::BufRead;
use std::io::Seek;
let (tx, mut reader) = BytesReader::new(8);
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
drop(tx);
reader.seek(std::io::SeekFrom::Start(4)).unwrap();
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"o", "o");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"howdy", "howdy");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"hewwo", "hewwo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"henlo", "henlo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf.len(), 0);
}
#[test]
fn seek_and_read_pending_bytes() {
use std::io::Read;
use std::io::Seek;
let (tx, mut reader) = BytesReader::new(8);
let join_handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
});
let mut buf = [0u8; 16];
reader.seek(std::io::SeekFrom::Start(4)).unwrap();
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 16);
assert_eq!(&buf, b"ohowdyhewwohenlo");
join_handle.join().unwrap();
}
#[test]
fn seek_and_buf_read_pending_bytes() {
use std::io::BufRead;
use std::io::Seek;
let (tx, mut reader) = BytesReader::new(8);
let join_handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
});
reader.seek(std::io::SeekFrom::Start(4)).unwrap();
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"o", "o");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"howdy", "howdy");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"hewwo", "hewwo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"henlo", "henlo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf.len(), 0);
join_handle.join().unwrap();
}
#[test]
fn seek_and_read_fewer_pending_bytes() {
use std::io::Read;
use std::io::Seek;
let (tx, mut reader) = BytesReader::new(8);
let join_handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
});
let mut buf = [0u8; 16];
reader.seek(std::io::SeekFrom::Start(8)).unwrap();
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 12);
assert_eq!(&buf[..n], b"dyhewwohenlo");
join_handle.join().unwrap();
}
#[test]
fn seek_and_buf_read_fewer_pending_bytes() {
use std::io::BufRead;
use std::io::Seek;
let (tx, mut reader) = BytesReader::new(8);
let join_handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
});
reader.seek(std::io::SeekFrom::Start(8)).unwrap();
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"dy", "dy");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"hewwo", "hewwo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"henlo", "henlo");
let n = buf.len();
reader.consume(n);
let buf = reader.fill_buf().unwrap();
assert_eq!(buf.len(), 0);
join_handle.join().unwrap();
}
#[test]
fn seek_past_end_pending_bytes() {
use std::io::Read;
use std::io::Seek;
let (tx, mut reader) = BytesReader::new(8);
let join_handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
});
let mut buf = [0u8; 16];
reader.seek(std::io::SeekFrom::Start(30)).unwrap();
let n = reader.read(&mut buf).unwrap();
assert_eq!(n, 0);
join_handle.join().unwrap();
}
#[test]
fn seek_past_end_buf_pending_bytes() {
use std::io::BufRead;
use std::io::Seek;
let (tx, mut reader) = BytesReader::new(8);
let join_handle = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
tx.blocking_send(Ok(Bytes::from(&b"hello"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"howdy"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"hewwo"[..]))).unwrap();
tx.blocking_send(Ok(Bytes::from(&b"henlo"[..]))).unwrap();
});
reader.seek(std::io::SeekFrom::Start(30)).unwrap();
let buf = reader.fill_buf().unwrap();
assert_eq!(buf, b"");
join_handle.join().unwrap();
}

View file

@ -1,5 +1,6 @@
mod backgrounded; mod backgrounded;
mod blurhash; mod blurhash;
mod bytes_reader;
mod bytes_stream; mod bytes_stream;
mod config; mod config;
mod details; mod details;