From 273de2260dc596ff27216066de088b5a4b94a0e7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 9 Dec 2017 05:54:04 -0800 Subject: [PATCH] refactor pipeline --- src/pipeline.rs | 346 ++++++++++++++++++++++-------------------------- 1 file changed, 161 insertions(+), 185 deletions(-) diff --git a/src/pipeline.rs b/src/pipeline.rs index 06f1e8f01..cea38fa9e 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -1,6 +1,7 @@ use std::{io, mem}; use std::rc::Rc; use std::cell::RefCell; +use std::marker::PhantomData; use futures::{Async, Poll, Future, Stream}; use futures::task::{Task as FutureTask, current as current_task}; @@ -18,7 +19,7 @@ use middlewares::{Middleware, Finished, Started, Response}; type Handler = Fn(HttpRequest) -> Reply; pub(crate) type PipelineHandler<'a, S> = &'a Fn(HttpRequest) -> Reply; -pub struct Pipeline(PipelineState); +pub struct Pipeline(PipelineInfo, PipelineState); enum PipelineState { None, @@ -31,47 +32,6 @@ enum PipelineState { Completed(Completed), } -impl PipelineState { - - fn is_done(&self) -> bool { - match *self { - PipelineState::None | PipelineState::Error - | PipelineState::Starting(_) | PipelineState::Handler(_) - | PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true, - PipelineState::Finishing(ref st) => st.info.context.is_none(), - PipelineState::Completed(_) => false, - } - } - - fn disconnect(&mut self) { - let info = match *self { - PipelineState::None | PipelineState::Error => return, - PipelineState::Starting(ref mut st) => &mut st.info, - PipelineState::Handler(ref mut st) => &mut st.info, - PipelineState::RunMiddlewares(ref mut st) => &mut st.info, - PipelineState::Response(ref mut st) => &mut st.info, - PipelineState::Finishing(ref mut st) => &mut st.info, - PipelineState::Completed(ref mut st) => &mut st.0, - }; - if let Some(ref mut context) = info.context { - context.disconnected(); - } - } - - fn error(&mut self) -> Option { - let info = match *self { - PipelineState::None | PipelineState::Error => return None, - PipelineState::Starting(ref mut st) => &mut st.info, - PipelineState::Handler(ref mut st) => &mut st.info, - PipelineState::RunMiddlewares(ref mut st) => &mut st.info, - PipelineState::Response(ref mut st) => &mut st.info, - PipelineState::Finishing(ref mut st) => &mut st.info, - PipelineState::Completed(ref mut st) => &mut st.0, - }; - info.error.take() - } -} - struct PipelineInfo { req: HttpRequest, count: usize, @@ -162,98 +122,122 @@ impl Future for DrainFut { impl Pipeline { pub fn new(req: HttpRequest, - mw: Rc>>>, + mws: Rc>>>, handler: PipelineHandler) -> Pipeline { - Pipeline(StartMiddlewares::init(mw, req, handler)) + let mut info = PipelineInfo { + req: req, + count: 0, + mws: mws, + error: None, + context: None, + }; + let state = StartMiddlewares::init(&mut info, handler); + + Pipeline(info, state) } } impl Pipeline<()> { pub fn error>(err: R) -> Box { - Box::new(Pipeline(ProcessResponse::init( - PipelineInfo::new(HttpRequest::default()), err.into()))) + Box::new(Pipeline( + PipelineInfo::new(HttpRequest::default()), ProcessResponse::init(err.into()))) + } +} + +impl Pipeline { + + fn is_done(&self) -> bool { + match self.1 { + PipelineState::None | PipelineState::Error + | PipelineState::Starting(_) | PipelineState::Handler(_) + | PipelineState::RunMiddlewares(_) | PipelineState::Response(_) => true, + PipelineState::Finishing(_) => self.0.context.is_none(), + PipelineState::Completed(_) => false, + } } } impl HttpHandlerTask for Pipeline { fn disconnected(&mut self) { - self.0.disconnect() + if let Some(ref mut context) = self.0.context { + context.disconnected(); + } } fn poll_io(&mut self, io: &mut Writer) -> Poll { loop { - let state = mem::replace(&mut self.0, PipelineState::None); + let state = mem::replace(&mut self.1, PipelineState::None); match state { PipelineState::None => return Ok(Async::Ready(true)), PipelineState::Error => return Err(io::Error::new(io::ErrorKind::Other, "Internal error").into()), PipelineState::Starting(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => - self.0 = state, + self.1 = state, Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::Handler(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => - self.0 = state, + self.1 = state, Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::RunMiddlewares(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => - self.0 = state, + self.1 = state, Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::Response(st) => { - match st.poll_io(io) { + match st.poll_io(io, &mut self.0) { Ok(state) => { - self.0 = state; - if let Some(error) = self.0.error() { + self.1 = state; + if let Some(error) = self.0.error.take() { return Err(error) } else { - return Ok(Async::Ready(self.0.is_done())) + return Ok(Async::Ready(self.is_done())) } } Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::Finishing(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => - self.0 = state, + self.1 = state, Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::Completed(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => { - self.0 = state; + self.1 = state; return Ok(Async::Ready(true)); } Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } @@ -264,63 +248,63 @@ impl HttpHandlerTask for Pipeline { fn poll(&mut self) -> Poll<(), Error> { loop { - let state = mem::replace(&mut self.0, PipelineState::None); + let state = mem::replace(&mut self.1, PipelineState::None); match state { PipelineState::None | PipelineState::Error => { return Ok(Async::Ready(())) } PipelineState::Starting(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => - self.0 = state, + self.1 = state, Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::Handler(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => - self.0 = state, + self.1 = state, Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::RunMiddlewares(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => - self.0 = state, + self.1 = state, Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::Response(_) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady); } PipelineState::Finishing(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => - self.0 = state, + self.1 = state, Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } } PipelineState::Completed(st) => { - match st.poll() { + match st.poll(&mut self.0) { Ok(state) => { - self.0 = state; + self.1 = state; return Ok(Async::Ready(())); } Err(state) => { - self.0 = state; + self.1 = state; return Ok(Async::NotReady) } } @@ -336,21 +320,11 @@ type Fut = Box, Error=Error>>; struct StartMiddlewares { hnd: *mut Handler, fut: Option, - info: PipelineInfo, } impl StartMiddlewares { - fn init(mws: Rc>>>, - req: HttpRequest, handler: PipelineHandler) -> PipelineState { - let mut info = PipelineInfo { - req: req, - count: 0, - mws: mws, - error: None, - context: None, - }; - + fn init(info: &mut PipelineInfo, handler: PipelineHandler) -> PipelineState { // execute middlewares, we need this stage because middlewares could be non-async // and we can move to next state immidietly let len = info.mws.len(); @@ -369,8 +343,7 @@ impl StartMiddlewares { Ok(Async::NotReady) => return PipelineState::Starting(StartMiddlewares { hnd: handler as *const _ as *mut _, - fut: Some(fut), - info: info}), + fut: Some(fut)}), Ok(Async::Ready(resp)) => { if let Some(resp) = resp { return RunMiddlewares::init(info, resp); @@ -378,49 +351,49 @@ impl StartMiddlewares { info.count += 1; } Err(err) => - return ProcessResponse::init(info, err.into()), + return ProcessResponse::init(err.into()), }, Started::Err(err) => - return ProcessResponse::init(info, err.into()), + return ProcessResponse::init(err.into()), } } } } - fn poll(mut self) -> Result, PipelineState> { - let len = self.info.mws.len(); + fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> { + let len = info.mws.len(); 'outer: loop { match self.fut.as_mut().unwrap().poll() { Ok(Async::NotReady) => return Err(PipelineState::Starting(self)), Ok(Async::Ready(resp)) => { - self.info.count += 1; + info.count += 1; if let Some(resp) = resp { - return Ok(RunMiddlewares::init(self.info, resp)); + return Ok(RunMiddlewares::init(info, resp)); } - if self.info.count == len { - let reply = (unsafe{&*self.hnd})(self.info.req.clone()); - return Ok(WaitingResponse::init(self.info, reply)); + if info.count == len { + let reply = (unsafe{&*self.hnd})(info.req.clone()); + return Ok(WaitingResponse::init(info, reply)); } else { loop { - match self.info.mws[self.info.count].start(self.info.req_mut()) { + match info.mws[info.count].start(info.req_mut()) { Started::Done => - self.info.count += 1, + info.count += 1, Started::Response(resp) => { - return Ok(RunMiddlewares::init(self.info, resp)); + return Ok(RunMiddlewares::init(info, resp)); }, Started::Future(fut) => { self.fut = Some(fut); continue 'outer }, Started::Err(err) => - return Ok(ProcessResponse::init(self.info, err.into())) + return Ok(ProcessResponse::init(err.into())) } } } } Err(err) => - return Ok(ProcessResponse::init(self.info, err.into())) + return Ok(ProcessResponse::init(err.into())) } } } @@ -428,13 +401,13 @@ impl StartMiddlewares { // waiting for response struct WaitingResponse { - info: PipelineInfo, stream: PipelineResponse, + _s: PhantomData, } impl WaitingResponse { - fn init(info: PipelineInfo, reply: Reply) -> PipelineState + fn init(info: &mut PipelineInfo, reply: Reply) -> PipelineState { let stream = match reply.into() { ReplyItem::Message(resp) => @@ -446,10 +419,10 @@ impl WaitingResponse { }; PipelineState::Handler( - WaitingResponse { info: info, stream: stream }) + WaitingResponse { stream: stream, _s: PhantomData }) } - fn poll(mut self) -> Result, PipelineState> { + fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> { let stream = mem::replace(&mut self.stream, PipelineResponse::None); match stream { @@ -459,8 +432,8 @@ impl WaitingResponse { Ok(Async::Ready(Some(frame))) => { match frame { Frame::Message(resp) => { - self.info.context = Some(context); - return Ok(RunMiddlewares::init(self.info, resp)) + info.context = Some(context); + return Ok(RunMiddlewares::init(info, resp)) } Frame::Payload(_) | Frame::Drain(_) => (), } @@ -468,14 +441,14 @@ impl WaitingResponse { Ok(Async::Ready(None)) => { error!("Unexpected eof"); let err: Error = UnexpectedTaskFrame.into(); - return Ok(ProcessResponse::init(self.info, err.into())) + return Ok(ProcessResponse::init(err.into())) }, Ok(Async::NotReady) => { self.stream = PipelineResponse::Context(context); return Err(PipelineState::Handler(self)) }, Err(err) => - return Ok(ProcessResponse::init(self.info, err.into())) + return Ok(ProcessResponse::init(err.into())) } } }, @@ -486,9 +459,9 @@ impl WaitingResponse { Err(PipelineState::Handler(self)) } Ok(Async::Ready(response)) => - Ok(RunMiddlewares::init(self.info, response)), + Ok(RunMiddlewares::init(info, response)), Err(err) => - Ok(ProcessResponse::init(self.info, err.into())), + Ok(ProcessResponse::init(err.into())), } } PipelineResponse::None => { @@ -501,17 +474,17 @@ impl WaitingResponse { /// Middlewares response executor struct RunMiddlewares { - info: PipelineInfo, curr: usize, fut: Option>>, + _s: PhantomData, } impl RunMiddlewares { - fn init(mut info: PipelineInfo, mut resp: HttpResponse) -> PipelineState + fn init(info: &mut PipelineInfo, mut resp: HttpResponse) -> PipelineState { if info.count == 0 { - return ProcessResponse::init(info, resp); + return ProcessResponse::init(resp); } let mut curr = 0; let len = info.mws.len(); @@ -520,26 +493,26 @@ impl RunMiddlewares { resp = match info.mws[curr].response(info.req_mut(), resp) { Response::Err(err) => { info.count = curr + 1; - return ProcessResponse::init(info, err.into()) + return ProcessResponse::init(err.into()) } Response::Done(r) => { curr += 1; if curr == len { - return ProcessResponse::init(info, r) + return ProcessResponse::init(r) } else { r } }, Response::Future(fut) => { return PipelineState::RunMiddlewares( - RunMiddlewares { info: info, curr: curr, fut: Some(fut) }) + RunMiddlewares { curr: curr, fut: Some(fut), _s: PhantomData }) }, }; } } - fn poll(mut self) -> Result, PipelineState> { - let len = self.info.mws.len(); + fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> { + let len = info.mws.len(); loop { // poll latest fut @@ -551,16 +524,16 @@ impl RunMiddlewares { resp } Err(err) => - return Ok(ProcessResponse::init(self.info, err.into())), + return Ok(ProcessResponse::init(err.into())), }; loop { if self.curr == len { - return Ok(ProcessResponse::init(self.info, resp)); + return Ok(ProcessResponse::init(resp)); } else { - match self.info.mws[self.curr].response(self.info.req_mut(), resp) { + match info.mws[self.curr].response(info.req_mut(), resp) { Response::Err(err) => - return Ok(ProcessResponse::init(self.info, err.into())), + return Ok(ProcessResponse::init(err.into())), Response::Done(r) => { self.curr += 1; resp = r @@ -581,7 +554,7 @@ struct ProcessResponse { iostate: IOState, running: RunningState, drain: DrainVec, - info: PipelineInfo, + _s: PhantomData, } #[derive(PartialEq)] @@ -633,29 +606,31 @@ impl Drop for DrainVec { impl ProcessResponse { - fn init(info: PipelineInfo, resp: HttpResponse) -> PipelineState + fn init(resp: HttpResponse) -> PipelineState { PipelineState::Response( ProcessResponse{ resp: resp, iostate: IOState::Response, running: RunningState::Running, drain: DrainVec(Vec::new()), - info: info}) + _s: PhantomData}) } - fn poll_io(mut self, io: &mut Writer) -> Result, PipelineState> { + fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo) + -> Result, PipelineState> + { if self.drain.0.is_empty() && self.running != RunningState::Paused { // if task is paused, write buffer is probably full loop { let result = match mem::replace(&mut self.iostate, IOState::Done) { IOState::Response => { - let result = match io.start(self.info.req_mut().get_inner(), + let result = match io.start(info.req_mut().get_inner(), &mut self.resp) { Ok(res) => res, Err(err) => { - self.info.error = Some(err.into()); - return Ok(FinishingMiddlewares::init(self.info, self.resp)) + info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init(info, self.resp)) } }; @@ -672,13 +647,13 @@ impl ProcessResponse { IOState::Payload(mut body) => { // always poll context if self.running == RunningState::Running { - match self.info.poll_context() { + match info.poll_context() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => self.running = RunningState::Done, Err(err) => { - self.info.error = Some(err); - return Ok(FinishingMiddlewares::init(self.info, self.resp)) + info.error = Some(err); + return Ok(FinishingMiddlewares::init(info, self.resp)) } } } @@ -687,8 +662,8 @@ impl ProcessResponse { Ok(Async::Ready(None)) => { self.iostate = IOState::Done; if let Err(err) = io.write_eof() { - self.info.error = Some(err.into()); - return Ok(FinishingMiddlewares::init(self.info, self.resp)) + info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init(info, self.resp)) } break }, @@ -696,9 +671,8 @@ impl ProcessResponse { self.iostate = IOState::Payload(body); match io.write(chunk.as_ref()) { Err(err) => { - self.info.error = Some(err.into()); - return Ok(FinishingMiddlewares::init( - self.info, self.resp)) + info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init(info, self.resp)) }, Ok(result) => result } @@ -708,27 +682,27 @@ impl ProcessResponse { break }, Err(err) => { - self.info.error = Some(err); - return Ok(FinishingMiddlewares::init(self.info, self.resp)) + info.error = Some(err); + return Ok(FinishingMiddlewares::init(info, self.resp)) } } }, IOState::Context => { - match self.info.context.as_mut().unwrap().poll() { + match info.context.as_mut().unwrap().poll() { Ok(Async::Ready(Some(frame))) => { match frame { Frame::Message(msg) => { error!("Unexpected message frame {:?}", msg); - self.info.error = Some(UnexpectedTaskFrame.into()); + info.error = Some(UnexpectedTaskFrame.into()); return Ok( - FinishingMiddlewares::init(self.info, self.resp)) + FinishingMiddlewares::init(info, self.resp)) }, Frame::Payload(None) => { self.iostate = IOState::Done; if let Err(err) = io.write_eof() { - self.info.error = Some(err.into()); + info.error = Some(err.into()); return Ok( - FinishingMiddlewares::init(self.info, self.resp)) + FinishingMiddlewares::init(info, self.resp)) } break }, @@ -736,9 +710,9 @@ impl ProcessResponse { self.iostate = IOState::Context; match io.write(chunk.as_ref()) { Err(err) => { - self.info.error = Some(err.into()); + info.error = Some(err.into()); return Ok(FinishingMiddlewares::init( - self.info, self.resp)) + info, self.resp)) }, Ok(result) => result } @@ -751,7 +725,7 @@ impl ProcessResponse { }, Ok(Async::Ready(None)) => { self.iostate = IOState::Done; - self.info.context.take(); + info.context.take(); break } Ok(Async::NotReady) => { @@ -759,8 +733,8 @@ impl ProcessResponse { break } Err(err) => { - self.info.error = Some(err); - return Ok(FinishingMiddlewares::init(self.info, self.resp)) + info.error = Some(err); + return Ok(FinishingMiddlewares::init(info, self.resp)) } } } @@ -787,8 +761,8 @@ impl ProcessResponse { return Err(PipelineState::Response(self)), Err(err) => { debug!("Error sending data: {}", err); - self.info.error = Some(err.into()); - return Ok(FinishingMiddlewares::init(self.info, self.resp)) + info.error = Some(err.into()); + return Ok(FinishingMiddlewares::init(info, self.resp)) } } @@ -803,7 +777,7 @@ impl ProcessResponse { // response is completed if self.iostate.is_done() { self.resp.set_response_size(io.written()); - Ok(FinishingMiddlewares::init(self.info, self.resp)) + Ok(FinishingMiddlewares::init(info, self.resp)) } else { Err(PipelineState::Response(self)) } @@ -812,24 +786,24 @@ impl ProcessResponse { /// Middlewares start executor struct FinishingMiddlewares { - info: PipelineInfo, resp: HttpResponse, fut: Option>>, + _s: PhantomData, } impl FinishingMiddlewares { - fn init(info: PipelineInfo, resp: HttpResponse) -> PipelineState { + fn init(info: &mut PipelineInfo, resp: HttpResponse) -> PipelineState { if info.count == 0 { Completed::init(info) } else { - match (FinishingMiddlewares{info: info, resp: resp, fut: None}).poll() { + match (FinishingMiddlewares{resp: resp, fut: None, _s: PhantomData}).poll(info) { Ok(st) | Err(st) => st, } } } - fn poll(mut self) -> Result, PipelineState> { + fn poll(mut self, info: &mut PipelineInfo) -> Result, PipelineState> { loop { // poll latest fut let not_ready = if let Some(ref mut fut) = self.fut { @@ -852,12 +826,12 @@ impl FinishingMiddlewares { return Ok(PipelineState::Finishing(self)) } self.fut = None; - self.info.count -= 1; + info.count -= 1; - match self.info.mws[self.info.count].finish(self.info.req_mut(), &self.resp) { + match info.mws[info.count].finish(info.req_mut(), &self.resp) { Finished::Done => { - if self.info.count == 0 { - return Ok(Completed::init(self.info)) + if info.count == 0 { + return Ok(Completed::init(info)) } } Finished::Future(fut) => { @@ -868,21 +842,21 @@ impl FinishingMiddlewares { } } -struct Completed(PipelineInfo); +struct Completed(PhantomData); impl Completed { - fn init(info: PipelineInfo) -> PipelineState { + fn init(info: &mut PipelineInfo) -> PipelineState { if info.context.is_none() { PipelineState::None } else { - PipelineState::Completed(Completed(info)) + PipelineState::Completed(Completed(PhantomData)) } } - fn poll(mut self) -> Result, PipelineState> { - match self.0.poll_context() { - Ok(Async::NotReady) => Ok(PipelineState::Completed(self)), + fn poll(self, info: &mut PipelineInfo) -> Result, PipelineState> { + match info.poll_context() { + Ok(Async::NotReady) => Ok(PipelineState::Completed(Completed(PhantomData))), Ok(Async::Ready(())) => Ok(PipelineState::None), Err(_) => Ok(PipelineState::Error), } @@ -914,23 +888,25 @@ mod tests { #[test] fn test_completed() { Core::new().unwrap().run(lazy(|| { - let info = Box::new(PipelineInfo::new(HttpRequest::default())); - Completed::init(info).is_none().unwrap(); + let mut info = PipelineInfo::new(HttpRequest::default()); + Completed::init(&mut info).is_none().unwrap(); let req = HttpRequest::default(); let mut ctx = HttpContext::new(req.clone(), MyActor); let addr: Address<_> = ctx.address(); - let mut info = Box::new(PipelineInfo::new(req)); + let mut info = PipelineInfo::new(req); info.context = Some(Box::new(ctx)); - let mut state = Completed::init(info).completed().unwrap(); + let mut state = Completed::init(&mut info).completed().unwrap(); - let st = state.poll().ok().unwrap(); - assert!(!st.is_done()); + let st = state.poll(&mut info).ok().unwrap(); + let pp = Pipeline(info, st); + assert!(!pp.is_done()); + let Pipeline(mut info, st) = pp; state = st.completed().unwrap(); drop(addr); - state.poll().ok().unwrap().is_none().unwrap(); + state.poll(&mut info).ok().unwrap().is_none().unwrap(); result(Ok::<_, ()>(())) })).unwrap()