From befb9c8196ffa124cefc533312b7361390e5d9b9 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 4 Jul 2024 00:37:25 +0100 Subject: [PATCH] refactor(multipart): move Payload* to module --- actix-multipart/Cargo.toml | 1 - actix-multipart/src/form/bytes.rs | 5 +- actix-multipart/src/form/json.rs | 3 +- actix-multipart/src/lib.rs | 1 + actix-multipart/src/payload.rs | 130 ++++++++++++++++++++++++++++++ actix-multipart/src/server.rs | 130 +++--------------------------- actix-multipart/src/test.rs | 6 +- 7 files changed, 147 insertions(+), 129 deletions(-) create mode 100644 actix-multipart/src/payload.rs diff --git a/actix-multipart/Cargo.toml b/actix-multipart/Cargo.toml index c5ff7dd10..e836a2c09 100644 --- a/actix-multipart/Cargo.toml +++ b/actix-multipart/Cargo.toml @@ -41,7 +41,6 @@ actix-multipart-derive = { version = "=0.6.1", optional = true } actix-utils = "3" actix-web = { version = "4", default-features = false } -bytes = "1" derive_more = "0.99.5" futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] } diff --git a/actix-multipart/src/form/bytes.rs b/actix-multipart/src/form/bytes.rs index c152db3d0..51b0cf7d9 100644 --- a/actix-multipart/src/form/bytes.rs +++ b/actix-multipart/src/form/bytes.rs @@ -1,7 +1,6 @@ //! Reads a field into memory. -use actix_web::HttpRequest; -use bytes::BytesMut; +use actix_web::{web::BytesMut, HttpRequest}; use futures_core::future::LocalBoxFuture; use futures_util::TryStreamExt as _; use mime::Mime; @@ -15,7 +14,7 @@ use crate::{ #[derive(Debug)] pub struct Bytes { /// The data. - pub data: bytes::Bytes, + pub data: actix_web::web::Bytes, /// The value of the `Content-Type` header. pub content_type: Option, diff --git a/actix-multipart/src/form/json.rs b/actix-multipart/src/form/json.rs index 3504c340c..0118a8fba 100644 --- a/actix-multipart/src/form/json.rs +++ b/actix-multipart/src/form/json.rs @@ -134,8 +134,7 @@ impl Default for JsonConfig { mod tests { use std::collections::HashMap; - use actix_web::{http::StatusCode, web, App, HttpResponse, Responder}; - use bytes::Bytes; + use actix_web::{http::StatusCode, web, web::Bytes, App, HttpResponse, Responder}; use crate::form::{ json::{Json, JsonConfig}, diff --git a/actix-multipart/src/lib.rs b/actix-multipart/src/lib.rs index 63c2e890f..d61aa139b 100644 --- a/actix-multipart/src/lib.rs +++ b/actix-multipart/src/lib.rs @@ -60,6 +60,7 @@ extern crate self as actix_multipart; mod error; mod extractor; pub mod form; +pub(crate) mod payload; pub(crate) mod safety; mod server; pub mod test; diff --git a/actix-multipart/src/payload.rs b/actix-multipart/src/payload.rs new file mode 100644 index 000000000..d27cdbe05 --- /dev/null +++ b/actix-multipart/src/payload.rs @@ -0,0 +1,130 @@ +use std::{ + cell::{RefCell, RefMut}, + pin::Pin, + rc::Rc, + task::{Context, Poll}, +}; + +use actix_web::{ + error::PayloadError, + web::{Bytes, BytesMut}, +}; +use futures_core::stream::{LocalBoxStream, Stream}; + +use crate::{error::MultipartError, safety::Safety}; + +pub(crate) struct PayloadRef { + payload: Rc>, +} + +impl PayloadRef { + pub(crate) fn new(payload: PayloadBuffer) -> PayloadRef { + PayloadRef { + payload: Rc::new(payload.into()), + } + } + + pub(crate) fn get_mut(&self, safety: &Safety) -> Option> { + if safety.current() { + Some(self.payload.borrow_mut()) + } else { + None + } + } +} + +impl Clone for PayloadRef { + fn clone(&self) -> PayloadRef { + PayloadRef { + payload: Rc::clone(&self.payload), + } + } +} + +/// Payload buffer. +pub(crate) struct PayloadBuffer { + pub(crate) eof: bool, + pub(crate) buf: BytesMut, + pub(crate) stream: LocalBoxStream<'static, Result>, +} + +impl PayloadBuffer { + /// Constructs new `PayloadBuffer` instance. + pub(crate) fn new(stream: S) -> Self + where + S: Stream> + 'static, + { + PayloadBuffer { + eof: false, + buf: BytesMut::new(), + stream: Box::pin(stream), + } + } + + pub(crate) fn poll_stream(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> { + loop { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Ready(Some(Ok(data))) => self.buf.extend_from_slice(&data), + Poll::Ready(Some(Err(err))) => return Err(err), + Poll::Ready(None) => { + self.eof = true; + return Ok(()); + } + Poll::Pending => return Ok(()), + } + } + } + + /// Read exact number of bytes + #[cfg(test)] + pub(crate) fn read_exact(&mut self, size: usize) -> Option { + if size <= self.buf.len() { + Some(self.buf.split_to(size).freeze()) + } else { + None + } + } + + pub(crate) fn read_max(&mut self, size: u64) -> Result, MultipartError> { + if !self.buf.is_empty() { + let size = std::cmp::min(self.buf.len() as u64, size) as usize; + Ok(Some(self.buf.split_to(size).freeze())) + } else if self.eof { + Err(MultipartError::Incomplete) + } else { + Ok(None) + } + } + + /// Read until specified ending + pub(crate) fn read_until(&mut self, line: &[u8]) -> Result, MultipartError> { + let res = memchr::memmem::find(&self.buf, line) + .map(|idx| self.buf.split_to(idx + line.len()).freeze()); + + if res.is_none() && self.eof { + Err(MultipartError::Incomplete) + } else { + Ok(res) + } + } + + /// Read bytes until new line delimiter + pub(crate) fn readline(&mut self) -> Result, MultipartError> { + self.read_until(b"\n") + } + + /// Read bytes until new line delimiter or eof + pub(crate) fn readline_or_eof(&mut self) -> Result, MultipartError> { + match self.readline() { + Err(MultipartError::Incomplete) if self.eof => Ok(Some(self.buf.split().freeze())), + line => line, + } + } + + /// Put unprocessed data back to the buffer + pub(crate) fn unprocessed(&mut self, data: Bytes) { + let buf = BytesMut::from(data.as_ref()); + let buf = std::mem::replace(&mut self.buf, buf); + self.buf.extend_from_slice(&buf); + } +} diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 1a173f88a..bbd96621b 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -1,7 +1,7 @@ //! Multipart response payload support. use std::{ - cell::{RefCell, RefMut}, + cell::RefCell, cmp, fmt, pin::Pin, rc::Rc, @@ -12,13 +12,17 @@ use actix_web::{ dev, error::{ParseError, PayloadError}, http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue}, + web::Bytes, HttpRequest, }; -use bytes::{Bytes, BytesMut}; -use futures_core::stream::{LocalBoxStream, Stream}; +use futures_core::stream::Stream; use mime::Mime; -use crate::{error::MultipartError, safety::Safety}; +use crate::{ + error::MultipartError, + payload::{PayloadBuffer, PayloadRef}, + safety::Safety, +}; const MAX_HEADERS: usize = 32; @@ -767,122 +771,6 @@ impl InnerField { } } -struct PayloadRef { - payload: Rc>, -} - -impl PayloadRef { - fn new(payload: PayloadBuffer) -> PayloadRef { - PayloadRef { - payload: Rc::new(payload.into()), - } - } - - fn get_mut(&self, safety: &Safety) -> Option> { - if safety.current() { - Some(self.payload.borrow_mut()) - } else { - None - } - } -} - -impl Clone for PayloadRef { - fn clone(&self) -> PayloadRef { - PayloadRef { - payload: Rc::clone(&self.payload), - } - } -} - -/// Payload buffer. -struct PayloadBuffer { - eof: bool, - buf: BytesMut, - stream: LocalBoxStream<'static, Result>, -} - -impl PayloadBuffer { - /// Constructs new `PayloadBuffer` instance. - fn new(stream: S) -> Self - where - S: Stream> + 'static, - { - PayloadBuffer { - eof: false, - buf: BytesMut::new(), - stream: Box::pin(stream), - } - } - - fn poll_stream(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> { - loop { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Ready(Some(Ok(data))) => self.buf.extend_from_slice(&data), - Poll::Ready(Some(Err(err))) => return Err(err), - Poll::Ready(None) => { - self.eof = true; - return Ok(()); - } - Poll::Pending => return Ok(()), - } - } - } - - /// Read exact number of bytes - #[cfg(test)] - fn read_exact(&mut self, size: usize) -> Option { - if size <= self.buf.len() { - Some(self.buf.split_to(size).freeze()) - } else { - None - } - } - - fn read_max(&mut self, size: u64) -> Result, MultipartError> { - if !self.buf.is_empty() { - let size = std::cmp::min(self.buf.len() as u64, size) as usize; - Ok(Some(self.buf.split_to(size).freeze())) - } else if self.eof { - Err(MultipartError::Incomplete) - } else { - Ok(None) - } - } - - /// Read until specified ending - fn read_until(&mut self, line: &[u8]) -> Result, MultipartError> { - let res = memchr::memmem::find(&self.buf, line) - .map(|idx| self.buf.split_to(idx + line.len()).freeze()); - - if res.is_none() && self.eof { - Err(MultipartError::Incomplete) - } else { - Ok(res) - } - } - - /// Read bytes until new line delimiter - fn readline(&mut self) -> Result, MultipartError> { - self.read_until(b"\n") - } - - /// Read bytes until new line delimiter or eof - fn readline_or_eof(&mut self) -> Result, MultipartError> { - match self.readline() { - Err(MultipartError::Incomplete) if self.eof => Ok(Some(self.buf.split().freeze())), - line => line, - } - } - - /// Put unprocessed data back to the buffer - fn unprocessed(&mut self, data: Bytes) { - let buf = BytesMut::from(data.as_ref()); - let buf = std::mem::replace(&mut self.buf, buf); - self.buf.extend_from_slice(&buf); - } -} - #[cfg(test)] mod tests { use std::time::Duration; @@ -892,10 +780,10 @@ mod tests { http::header::{DispositionParam, DispositionType}, rt, test::TestRequest, + web::{BufMut as _, BytesMut}, FromRequest, }; use assert_matches::assert_matches; - use bytes::BufMut as _; use futures_util::{future::lazy, StreamExt as _}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; diff --git a/actix-multipart/src/test.rs b/actix-multipart/src/test.rs index 828f37957..956595355 100644 --- a/actix-multipart/src/test.rs +++ b/actix-multipart/src/test.rs @@ -1,7 +1,9 @@ //! Multipart testing utilities. -use actix_web::http::header::{self, HeaderMap}; -use bytes::{BufMut as _, Bytes, BytesMut}; +use actix_web::{ + http::header::{self, HeaderMap}, + web::{BufMut as _, Bytes, BytesMut}, +}; use mime::Mime; use rand::{ distributions::{Alphanumeric, DistString as _},