From 53ce186294a774ae96dfccec5e9538e457a42aa5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 25 Nov 2017 12:05:27 -0800 Subject: [PATCH] cleanup pipeline --- src/pipeline.rs | 282 ++++++++++++++++++++++++------------------------ src/task.rs | 9 +- 2 files changed, 141 insertions(+), 150 deletions(-) diff --git a/src/pipeline.rs b/src/pipeline.rs index 87aa3f8f5..aa9979910 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -149,6 +149,140 @@ impl Pipeline { } } +type Fut = Box), Error=Error>>; + +/// Middlewares start executor +struct Start { + idx: usize, + hnd: *mut Handler, + disconnected: bool, + payload: Option, + fut: Option, + middlewares: Rc>>, +} + +enum StartResult { + Ready(Box), + NotReady(Start), +} + +impl Start { + + fn init(mw: Rc>>, + req: HttpRequest, + handler: PipelineHandler, + payload: Payload) -> Result { + Start { + idx: 0, + fut: None, + disconnected: false, + hnd: handler as *const _ as *mut _, + payload: Some(payload), + middlewares: mw, + }.start(req) + } + + fn disconnected(&mut self) { + self.disconnected = true; + } + + fn prepare(&self, mut task: Task) -> Task { + if self.disconnected { + task.disconnected() + } + task.set_middlewares(MiddlewaresResponse::new(self.idx-1, Rc::clone(&self.middlewares))); + task + } + + fn start(mut self, mut req: HttpRequest) -> Result { + let len = self.middlewares.len(); + loop { + if self.idx == len { + let task = (unsafe{&*self.hnd})( + &mut req, self.payload.take().expect("Something is completlywrong")); + return Ok(StartResult::Ready( + Box::new(Handle::new(self.idx-1, req, self.prepare(task), self.middlewares)))) + } else { + req = match self.middlewares[self.idx].start(req) { + Started::Done(req) => { + self.idx += 1; + req + } + Started::Response(req, resp) => { + return Ok(StartResult::Ready( + Box::new(Handle::new( + self.idx, req, self.prepare(Task::reply(resp)), self.middlewares)))) + }, + Started::Future(mut fut) => { + match fut.poll() { + Ok(Async::NotReady) => { + self.fut = Some(fut); + return Ok(StartResult::NotReady(self)) + } + Ok(Async::Ready((req, resp))) => { + if let Some(resp) = resp { + return Ok(StartResult::Ready( + Box::new(Handle::new( + self.idx, req, + self.prepare(Task::reply(resp)), self.middlewares)))) + } + self.idx += 1; + req + } + Err(err) => return Err(err) + } + }, + Started::Err(err) => return Err(err), + } + } + } + } + + fn poll(&mut self) -> Poll, Error> { + let len = self.middlewares.len(); + 'outer: loop { + match self.fut.as_mut().unwrap().poll() { + Ok(Async::NotReady) => return Ok(Async::NotReady), + Ok(Async::Ready((mut req, resp))) => { + self.idx += 1; + if let Some(resp) = resp { + return Ok(Async::Ready(Box::new(Handle::new( + self.idx-1, req, + self.prepare(Task::reply(resp)), Rc::clone(&self.middlewares))))) + } + if self.idx == len { + let task = (unsafe{&*self.hnd})( + &mut req, self.payload.take().expect("Something is completlywrong")); + return Ok(Async::Ready(Box::new(Handle::new( + self.idx-1, req, self.prepare(task), Rc::clone(&self.middlewares))))) + } else { + loop { + req = match self.middlewares[self.idx].start(req) { + Started::Done(req) => { + self.idx += 1; + req + } + Started::Response(req, resp) => { + self.idx += 1; + return Ok(Async::Ready(Box::new(Handle::new( + self.idx-1, req, self.prepare(Task::reply(resp)), + Rc::clone(&self.middlewares))))) + }, + Started::Future(fut) => { + self.fut = Some(fut); + continue 'outer + }, + Started::Err(err) => return Err(err), + } + } + } + } + Err(err) => return Err(err) + } + } + } +} + struct Handle { idx: usize, req: HttpRequest, @@ -157,13 +291,8 @@ struct Handle { } impl Handle { - fn new(idx: usize, - req: HttpRequest, - task: Task, - mw: Rc>>) -> Handle - { - Handle { - idx: idx, req: req, task:task, middlewares: mw } + fn new(idx: usize, req: HttpRequest, task: Task, mw: Rc>>) -> Handle { + Handle { idx: idx, req: req, task:task, middlewares: mw } } fn poll_io(&mut self, io: &mut T) -> Poll { @@ -227,139 +356,6 @@ impl Finish { } } -type Fut = Box), Error=Error>>; - -/// Middlewares start executor -struct Start { - idx: usize, - hnd: *mut Handler, - disconnected: bool, - payload: Option, - fut: Option, - middlewares: Rc>>, -} - -enum StartResult { - Ready(Box), - NotReady(Start), -} - -impl Start { - - fn init(mw: Rc>>, - req: HttpRequest, - handler: PipelineHandler, - payload: Payload) -> Result { - Start { - idx: 0, - fut: None, - disconnected: false, - hnd: handler as *const _ as *mut _, - payload: Some(payload), - middlewares: mw, - }.start(req) - } - - fn disconnected(&mut self) { - self.disconnected = true; - } - - fn prepare(&self, mut task: Task) -> Task { - if self.disconnected { - task.disconnected() - } - task.set_middlewares(MiddlewaresResponse::new(Rc::clone(&self.middlewares))); - task - } - - fn start(mut self, mut req: HttpRequest) -> Result { - loop { - if self.idx >= self.middlewares.len() { - let task = (unsafe{&*self.hnd})( - &mut req, self.payload.take().expect("Something is completlywrong")); - return Ok(StartResult::Ready( - Box::new(Handle::new(self.idx-1, req, self.prepare(task), self.middlewares)))) - } else { - req = match self.middlewares[self.idx].start(req) { - Started::Done(req) => { - self.idx += 1; - req - } - Started::Response(req, resp) => { - return Ok(StartResult::Ready( - Box::new(Handle::new( - self.idx, req, self.prepare(Task::reply(resp)), self.middlewares)))) - }, - Started::Future(mut fut) => { - match fut.poll() { - Ok(Async::NotReady) => { - self.fut = Some(fut); - return Ok(StartResult::NotReady(self)) - } - Ok(Async::Ready((req, resp))) => { - self.idx += 1; - if let Some(resp) = resp { - return Ok(StartResult::Ready( - Box::new(Handle::new( - self.idx, req, - self.prepare(Task::reply(resp)), self.middlewares)))) - } - req - } - Err(err) => return Err(err) - } - }, - Started::Err(err) => return Err(err), - } - } - } - } - - fn poll(&mut self) -> Poll, Error> { - 'outer: loop { - match self.fut.as_mut().unwrap().poll() { - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready((mut req, resp))) => { - self.idx += 1; - if let Some(resp) = resp { - return Ok(Async::Ready(Box::new(Handle::new( - self.idx, req, - self.prepare(Task::reply(resp)), Rc::clone(&self.middlewares))))) - } - if self.idx >= self.middlewares.len() { - let task = (unsafe{&*self.hnd})( - &mut req, self.payload.take().expect("Something is completlywrong")); - return Ok(Async::Ready(Box::new(Handle::new( - self.idx-1, req, - self.prepare(task), Rc::clone(&self.middlewares))))) - } else { - loop { - req = match self.middlewares[self.idx].start(req) { - Started::Done(req) => { - self.idx += 1; - req - } - Started::Response(req, resp) => { - return Ok(Async::Ready(Box::new(Handle::new( - self.idx, req, - self.prepare(Task::reply(resp)), - Rc::clone(&self.middlewares))))) - }, - Started::Future(fut) => { - self.fut = Some(fut); - continue 'outer - }, - Started::Err(err) => return Err(err), - } - } - } - } - Err(err) => return Err(err) - } - } - } -} - /// Middlewares response executor pub(crate) struct MiddlewaresResponse { idx: usize, @@ -369,9 +365,9 @@ pub(crate) struct MiddlewaresResponse { impl MiddlewaresResponse { - fn new(mw: Rc>>) -> MiddlewaresResponse { + fn new(idx: usize, mw: Rc>>) -> MiddlewaresResponse { MiddlewaresResponse { - idx: 0, + idx: idx, fut: None, middlewares: mw } } @@ -410,7 +406,7 @@ impl MiddlewaresResponse { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(resp)) => { - self.idx += 1; + self.idx -= 1; resp } Err(err) => return Err(err) diff --git a/src/task.rs b/src/task.rs index 067601d0b..f111891a6 100644 --- a/src/task.rs +++ b/src/task.rs @@ -213,7 +213,7 @@ impl Task { } } - // if task is paused, write buffer probably is full + // if task is paused, write buffer is probably full if self.state != TaskRunningState::Paused { // process exiting frames while let Some(frame) = self.frames.pop_front() { @@ -334,13 +334,8 @@ impl Task { } } } -} -impl Future for Task { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll { + pub(crate) fn poll(&mut self) -> Poll<(), Error> { let mut s = mem::replace(&mut self.stream, TaskStream::None); let result = match s {