1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-13 02:39:32 +00:00

cleanup pipeline

This commit is contained in:
Nikolay Kim 2017-11-25 12:05:27 -08:00
parent 37c1e78c7a
commit 53ce186294
2 changed files with 141 additions and 150 deletions

View file

@ -149,6 +149,140 @@ impl Pipeline {
}
}
type Fut = Box<Future<Item=(HttpRequest, Option<HttpResponse>), Error=Error>>;
/// Middlewares start executor
struct Start {
idx: usize,
hnd: *mut Handler,
disconnected: bool,
payload: Option<Payload>,
fut: Option<Fut>,
middlewares: Rc<Vec<Box<Middleware>>>,
}
enum StartResult {
Ready(Box<Handle>),
NotReady(Start),
}
impl Start {
fn init(mw: Rc<Vec<Box<Middleware>>>,
req: HttpRequest,
handler: PipelineHandler,
payload: Payload) -> Result<StartResult, Error> {
Start {
idx: 0,
fut: None,
disconnected: false,
hnd: handler as *const _ as *mut _,
payload: Some(payload),
middlewares: mw,
}.start(req)
}
fn disconnected(&mut self) {
self.disconnected = true;
}
fn prepare(&self, mut task: Task) -> Task {
if self.disconnected {
task.disconnected()
}
task.set_middlewares(MiddlewaresResponse::new(self.idx-1, Rc::clone(&self.middlewares)));
task
}
fn start(mut self, mut req: HttpRequest) -> Result<StartResult, Error> {
let len = self.middlewares.len();
loop {
if self.idx == len {
let task = (unsafe{&*self.hnd})(
&mut req, self.payload.take().expect("Something is completlywrong"));
return Ok(StartResult::Ready(
Box::new(Handle::new(self.idx-1, req, self.prepare(task), self.middlewares))))
} else {
req = match self.middlewares[self.idx].start(req) {
Started::Done(req) => {
self.idx += 1;
req
}
Started::Response(req, resp) => {
return Ok(StartResult::Ready(
Box::new(Handle::new(
self.idx, req, self.prepare(Task::reply(resp)), self.middlewares))))
},
Started::Future(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.fut = Some(fut);
return Ok(StartResult::NotReady(self))
}
Ok(Async::Ready((req, resp))) => {
if let Some(resp) = resp {
return Ok(StartResult::Ready(
Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)), self.middlewares))))
}
self.idx += 1;
req
}
Err(err) => return Err(err)
}
},
Started::Err(err) => return Err(err),
}
}
}
}
fn poll(&mut self) -> Poll<Box<Handle>, Error> {
let len = self.middlewares.len();
'outer: loop {
match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready((mut req, resp))) => {
self.idx += 1;
if let Some(resp) = resp {
return Ok(Async::Ready(Box::new(Handle::new(
self.idx-1, req,
self.prepare(Task::reply(resp)), Rc::clone(&self.middlewares)))))
}
if self.idx == len {
let task = (unsafe{&*self.hnd})(
&mut req, self.payload.take().expect("Something is completlywrong"));
return Ok(Async::Ready(Box::new(Handle::new(
self.idx-1, req, self.prepare(task), Rc::clone(&self.middlewares)))))
} else {
loop {
req = match self.middlewares[self.idx].start(req) {
Started::Done(req) => {
self.idx += 1;
req
}
Started::Response(req, resp) => {
self.idx += 1;
return Ok(Async::Ready(Box::new(Handle::new(
self.idx-1, req, self.prepare(Task::reply(resp)),
Rc::clone(&self.middlewares)))))
},
Started::Future(fut) => {
self.fut = Some(fut);
continue 'outer
},
Started::Err(err) => return Err(err),
}
}
}
}
Err(err) => return Err(err)
}
}
}
}
struct Handle {
idx: usize,
req: HttpRequest,
@ -157,13 +291,8 @@ struct Handle {
}
impl Handle {
fn new(idx: usize,
req: HttpRequest,
task: Task,
mw: Rc<Vec<Box<Middleware>>>) -> Handle
{
Handle {
idx: idx, req: req, task:task, middlewares: mw }
fn new(idx: usize, req: HttpRequest, task: Task, mw: Rc<Vec<Box<Middleware>>>) -> Handle {
Handle { idx: idx, req: req, task:task, middlewares: mw }
}
fn poll_io<T: Writer>(&mut self, io: &mut T) -> Poll<bool, Error> {
@ -227,139 +356,6 @@ impl Finish {
}
}
type Fut = Box<Future<Item=(HttpRequest, Option<HttpResponse>), Error=Error>>;
/// Middlewares start executor
struct Start {
idx: usize,
hnd: *mut Handler,
disconnected: bool,
payload: Option<Payload>,
fut: Option<Fut>,
middlewares: Rc<Vec<Box<Middleware>>>,
}
enum StartResult {
Ready(Box<Handle>),
NotReady(Start),
}
impl Start {
fn init(mw: Rc<Vec<Box<Middleware>>>,
req: HttpRequest,
handler: PipelineHandler,
payload: Payload) -> Result<StartResult, Error> {
Start {
idx: 0,
fut: None,
disconnected: false,
hnd: handler as *const _ as *mut _,
payload: Some(payload),
middlewares: mw,
}.start(req)
}
fn disconnected(&mut self) {
self.disconnected = true;
}
fn prepare(&self, mut task: Task) -> Task {
if self.disconnected {
task.disconnected()
}
task.set_middlewares(MiddlewaresResponse::new(Rc::clone(&self.middlewares)));
task
}
fn start(mut self, mut req: HttpRequest) -> Result<StartResult, Error> {
loop {
if self.idx >= self.middlewares.len() {
let task = (unsafe{&*self.hnd})(
&mut req, self.payload.take().expect("Something is completlywrong"));
return Ok(StartResult::Ready(
Box::new(Handle::new(self.idx-1, req, self.prepare(task), self.middlewares))))
} else {
req = match self.middlewares[self.idx].start(req) {
Started::Done(req) => {
self.idx += 1;
req
}
Started::Response(req, resp) => {
return Ok(StartResult::Ready(
Box::new(Handle::new(
self.idx, req, self.prepare(Task::reply(resp)), self.middlewares))))
},
Started::Future(mut fut) => {
match fut.poll() {
Ok(Async::NotReady) => {
self.fut = Some(fut);
return Ok(StartResult::NotReady(self))
}
Ok(Async::Ready((req, resp))) => {
self.idx += 1;
if let Some(resp) = resp {
return Ok(StartResult::Ready(
Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)), self.middlewares))))
}
req
}
Err(err) => return Err(err)
}
},
Started::Err(err) => return Err(err),
}
}
}
}
fn poll(&mut self) -> Poll<Box<Handle>, Error> {
'outer: loop {
match self.fut.as_mut().unwrap().poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready((mut req, resp))) => {
self.idx += 1;
if let Some(resp) = resp {
return Ok(Async::Ready(Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)), Rc::clone(&self.middlewares)))))
}
if self.idx >= self.middlewares.len() {
let task = (unsafe{&*self.hnd})(
&mut req, self.payload.take().expect("Something is completlywrong"));
return Ok(Async::Ready(Box::new(Handle::new(
self.idx-1, req,
self.prepare(task), Rc::clone(&self.middlewares)))))
} else {
loop {
req = match self.middlewares[self.idx].start(req) {
Started::Done(req) => {
self.idx += 1;
req
}
Started::Response(req, resp) => {
return Ok(Async::Ready(Box::new(Handle::new(
self.idx, req,
self.prepare(Task::reply(resp)),
Rc::clone(&self.middlewares)))))
},
Started::Future(fut) => {
self.fut = Some(fut);
continue 'outer
},
Started::Err(err) => return Err(err),
}
}
}
}
Err(err) => return Err(err)
}
}
}
}
/// Middlewares response executor
pub(crate) struct MiddlewaresResponse {
idx: usize,
@ -369,9 +365,9 @@ pub(crate) struct MiddlewaresResponse {
impl MiddlewaresResponse {
fn new(mw: Rc<Vec<Box<Middleware>>>) -> MiddlewaresResponse {
fn new(idx: usize, mw: Rc<Vec<Box<Middleware>>>) -> MiddlewaresResponse {
MiddlewaresResponse {
idx: 0,
idx: idx,
fut: None,
middlewares: mw }
}
@ -410,7 +406,7 @@ impl MiddlewaresResponse {
Ok(Async::NotReady) =>
return Ok(Async::NotReady),
Ok(Async::Ready(resp)) => {
self.idx += 1;
self.idx -= 1;
resp
}
Err(err) => return Err(err)

View file

@ -213,7 +213,7 @@ impl Task {
}
}
// if task is paused, write buffer probably is full
// if task is paused, write buffer is probably full
if self.state != TaskRunningState::Paused {
// process exiting frames
while let Some(frame) = self.frames.pop_front() {
@ -334,13 +334,8 @@ impl Task {
}
}
}
}
impl Future for Task {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
pub(crate) fn poll(&mut self) -> Poll<(), Error> {
let mut s = mem::replace(&mut self.stream, TaskStream::None);
let result = match s {