1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-23 07:28:06 +00:00

refactor(multipart): move Payload* to module

This commit is contained in:
Rob Ede 2024-07-04 00:37:25 +01:00
parent 2136e07bdd
commit befb9c8196
No known key found for this signature in database
GPG key ID: 97C636207D3EF933
7 changed files with 147 additions and 129 deletions

View file

@ -41,7 +41,6 @@ actix-multipart-derive = { version = "=0.6.1", optional = true }
actix-utils = "3"
actix-web = { version = "4", default-features = false }
bytes = "1"
derive_more = "0.99.5"
futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] }

View file

@ -1,7 +1,6 @@
//! Reads a field into memory.
use actix_web::HttpRequest;
use bytes::BytesMut;
use actix_web::{web::BytesMut, HttpRequest};
use futures_core::future::LocalBoxFuture;
use futures_util::TryStreamExt as _;
use mime::Mime;
@ -15,7 +14,7 @@ use crate::{
#[derive(Debug)]
pub struct Bytes {
/// The data.
pub data: bytes::Bytes,
pub data: actix_web::web::Bytes,
/// The value of the `Content-Type` header.
pub content_type: Option<Mime>,

View file

@ -134,8 +134,7 @@ impl Default for JsonConfig {
mod tests {
use std::collections::HashMap;
use actix_web::{http::StatusCode, web, App, HttpResponse, Responder};
use bytes::Bytes;
use actix_web::{http::StatusCode, web, web::Bytes, App, HttpResponse, Responder};
use crate::form::{
json::{Json, JsonConfig},

View file

@ -60,6 +60,7 @@ extern crate self as actix_multipart;
mod error;
mod extractor;
pub mod form;
pub(crate) mod payload;
pub(crate) mod safety;
mod server;
pub mod test;

View file

@ -0,0 +1,130 @@
use std::{
cell::{RefCell, RefMut},
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use actix_web::{
error::PayloadError,
web::{Bytes, BytesMut},
};
use futures_core::stream::{LocalBoxStream, Stream};
use crate::{error::MultipartError, safety::Safety};
pub(crate) struct PayloadRef {
payload: Rc<RefCell<PayloadBuffer>>,
}
impl PayloadRef {
pub(crate) fn new(payload: PayloadBuffer) -> PayloadRef {
PayloadRef {
payload: Rc::new(payload.into()),
}
}
pub(crate) fn get_mut(&self, safety: &Safety) -> Option<RefMut<'_, PayloadBuffer>> {
if safety.current() {
Some(self.payload.borrow_mut())
} else {
None
}
}
}
impl Clone for PayloadRef {
fn clone(&self) -> PayloadRef {
PayloadRef {
payload: Rc::clone(&self.payload),
}
}
}
/// Payload buffer.
pub(crate) struct PayloadBuffer {
pub(crate) eof: bool,
pub(crate) buf: BytesMut,
pub(crate) stream: LocalBoxStream<'static, Result<Bytes, PayloadError>>,
}
impl PayloadBuffer {
/// Constructs new `PayloadBuffer` instance.
pub(crate) fn new<S>(stream: S) -> Self
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
PayloadBuffer {
eof: false,
buf: BytesMut::new(),
stream: Box::pin(stream),
}
}
pub(crate) fn poll_stream(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> {
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(data))) => self.buf.extend_from_slice(&data),
Poll::Ready(Some(Err(err))) => return Err(err),
Poll::Ready(None) => {
self.eof = true;
return Ok(());
}
Poll::Pending => return Ok(()),
}
}
}
/// Read exact number of bytes
#[cfg(test)]
pub(crate) fn read_exact(&mut self, size: usize) -> Option<Bytes> {
if size <= self.buf.len() {
Some(self.buf.split_to(size).freeze())
} else {
None
}
}
pub(crate) fn read_max(&mut self, size: u64) -> Result<Option<Bytes>, MultipartError> {
if !self.buf.is_empty() {
let size = std::cmp::min(self.buf.len() as u64, size) as usize;
Ok(Some(self.buf.split_to(size).freeze()))
} else if self.eof {
Err(MultipartError::Incomplete)
} else {
Ok(None)
}
}
/// Read until specified ending
pub(crate) fn read_until(&mut self, line: &[u8]) -> Result<Option<Bytes>, MultipartError> {
let res = memchr::memmem::find(&self.buf, line)
.map(|idx| self.buf.split_to(idx + line.len()).freeze());
if res.is_none() && self.eof {
Err(MultipartError::Incomplete)
} else {
Ok(res)
}
}
/// Read bytes until new line delimiter
pub(crate) fn readline(&mut self) -> Result<Option<Bytes>, MultipartError> {
self.read_until(b"\n")
}
/// Read bytes until new line delimiter or eof
pub(crate) fn readline_or_eof(&mut self) -> Result<Option<Bytes>, MultipartError> {
match self.readline() {
Err(MultipartError::Incomplete) if self.eof => Ok(Some(self.buf.split().freeze())),
line => line,
}
}
/// Put unprocessed data back to the buffer
pub(crate) fn unprocessed(&mut self, data: Bytes) {
let buf = BytesMut::from(data.as_ref());
let buf = std::mem::replace(&mut self.buf, buf);
self.buf.extend_from_slice(&buf);
}
}

View file

@ -1,7 +1,7 @@
//! Multipart response payload support.
use std::{
cell::{RefCell, RefMut},
cell::RefCell,
cmp, fmt,
pin::Pin,
rc::Rc,
@ -12,13 +12,17 @@ use actix_web::{
dev,
error::{ParseError, PayloadError},
http::header::{self, ContentDisposition, HeaderMap, HeaderName, HeaderValue},
web::Bytes,
HttpRequest,
};
use bytes::{Bytes, BytesMut};
use futures_core::stream::{LocalBoxStream, Stream};
use futures_core::stream::Stream;
use mime::Mime;
use crate::{error::MultipartError, safety::Safety};
use crate::{
error::MultipartError,
payload::{PayloadBuffer, PayloadRef},
safety::Safety,
};
const MAX_HEADERS: usize = 32;
@ -767,122 +771,6 @@ impl InnerField {
}
}
struct PayloadRef {
payload: Rc<RefCell<PayloadBuffer>>,
}
impl PayloadRef {
fn new(payload: PayloadBuffer) -> PayloadRef {
PayloadRef {
payload: Rc::new(payload.into()),
}
}
fn get_mut(&self, safety: &Safety) -> Option<RefMut<'_, PayloadBuffer>> {
if safety.current() {
Some(self.payload.borrow_mut())
} else {
None
}
}
}
impl Clone for PayloadRef {
fn clone(&self) -> PayloadRef {
PayloadRef {
payload: Rc::clone(&self.payload),
}
}
}
/// Payload buffer.
struct PayloadBuffer {
eof: bool,
buf: BytesMut,
stream: LocalBoxStream<'static, Result<Bytes, PayloadError>>,
}
impl PayloadBuffer {
/// Constructs new `PayloadBuffer` instance.
fn new<S>(stream: S) -> Self
where
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
PayloadBuffer {
eof: false,
buf: BytesMut::new(),
stream: Box::pin(stream),
}
}
fn poll_stream(&mut self, cx: &mut Context<'_>) -> Result<(), PayloadError> {
loop {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(Ok(data))) => self.buf.extend_from_slice(&data),
Poll::Ready(Some(Err(err))) => return Err(err),
Poll::Ready(None) => {
self.eof = true;
return Ok(());
}
Poll::Pending => return Ok(()),
}
}
}
/// Read exact number of bytes
#[cfg(test)]
fn read_exact(&mut self, size: usize) -> Option<Bytes> {
if size <= self.buf.len() {
Some(self.buf.split_to(size).freeze())
} else {
None
}
}
fn read_max(&mut self, size: u64) -> Result<Option<Bytes>, MultipartError> {
if !self.buf.is_empty() {
let size = std::cmp::min(self.buf.len() as u64, size) as usize;
Ok(Some(self.buf.split_to(size).freeze()))
} else if self.eof {
Err(MultipartError::Incomplete)
} else {
Ok(None)
}
}
/// Read until specified ending
fn read_until(&mut self, line: &[u8]) -> Result<Option<Bytes>, MultipartError> {
let res = memchr::memmem::find(&self.buf, line)
.map(|idx| self.buf.split_to(idx + line.len()).freeze());
if res.is_none() && self.eof {
Err(MultipartError::Incomplete)
} else {
Ok(res)
}
}
/// Read bytes until new line delimiter
fn readline(&mut self) -> Result<Option<Bytes>, MultipartError> {
self.read_until(b"\n")
}
/// Read bytes until new line delimiter or eof
fn readline_or_eof(&mut self) -> Result<Option<Bytes>, MultipartError> {
match self.readline() {
Err(MultipartError::Incomplete) if self.eof => Ok(Some(self.buf.split().freeze())),
line => line,
}
}
/// Put unprocessed data back to the buffer
fn unprocessed(&mut self, data: Bytes) {
let buf = BytesMut::from(data.as_ref());
let buf = std::mem::replace(&mut self.buf, buf);
self.buf.extend_from_slice(&buf);
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
@ -892,10 +780,10 @@ mod tests {
http::header::{DispositionParam, DispositionType},
rt,
test::TestRequest,
web::{BufMut as _, BytesMut},
FromRequest,
};
use assert_matches::assert_matches;
use bytes::BufMut as _;
use futures_util::{future::lazy, StreamExt as _};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

View file

@ -1,7 +1,9 @@
//! Multipart testing utilities.
use actix_web::http::header::{self, HeaderMap};
use bytes::{BufMut as _, Bytes, BytesMut};
use actix_web::{
http::header::{self, HeaderMap},
web::{BufMut as _, Bytes, BytesMut},
};
use mime::Mime;
use rand::{
distributions::{Alphanumeric, DistString as _},