From 32212bad1f36790bfbb16be823624a41d9e2327b Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 25 Jun 2018 09:08:28 +0600 Subject: [PATCH] simplify http response pool --- src/application.rs | 3 +- src/httpresponse.rs | 113 ++++++++++++++++------------------------- src/pipeline.rs | 39 +++++++++----- src/server/settings.rs | 10 ++-- 4 files changed, 77 insertions(+), 88 deletions(-) diff --git a/src/application.rs b/src/application.rs index bdc55fe79..906e8f9a0 100644 --- a/src/application.rs +++ b/src/application.rs @@ -636,7 +636,8 @@ where }; } - let (router, resources) = Router::new(&prefix, parts.settings, resources); + let (router, resources) = + Router::new(&prefix, parts.settings.clone(), resources); let inner = Rc::new(Inner { prefix: prefix_len, diff --git a/src/httpresponse.rs b/src/httpresponse.rs index 333e6c4ad..7ed82a8d1 100644 --- a/src/httpresponse.rs +++ b/src/httpresponse.rs @@ -1,8 +1,7 @@ //! Http response -use std::cell::UnsafeCell; +use std::cell::RefCell; use std::collections::VecDeque; use std::io::Write; -use std::rc::Rc; use std::{fmt, mem, str}; use bytes::{BufMut, Bytes, BytesMut}; @@ -36,30 +35,17 @@ pub enum ConnectionType { } /// An HTTP Response -pub struct HttpResponse( - Option>, - Rc>, -); - -impl Drop for HttpResponse { - fn drop(&mut self) { - if let Some(inner) = self.0.take() { - HttpResponsePool::release(&self.1, inner) - } - } -} +pub struct HttpResponse(Box); impl HttpResponse { - #[inline(always)] - #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] + #[inline] fn get_ref(&self) -> &InnerHttpResponse { - self.0.as_ref().unwrap() + self.0.as_ref() } - #[inline(always)] - #[cfg_attr(feature = "cargo-clippy", allow(inline_always))] + #[inline] fn get_mut(&mut self) -> &mut InnerHttpResponse { - self.0.as_mut().unwrap() + self.0.as_mut() } /// Create http response builder with specific status. @@ -96,7 +82,7 @@ impl HttpResponse { /// Convert `HttpResponse` to a `HttpResponseBuilder` #[inline] - pub fn into_builder(mut self) -> HttpResponseBuilder { + pub fn into_builder(self) -> HttpResponseBuilder { // If this response has cookies, load them into a jar let mut jar: Option = None; for c in self.cookies() { @@ -109,11 +95,8 @@ impl HttpResponse { } } - let response = self.0.take(); - let pool = Some(Rc::clone(&self.1)); HttpResponseBuilder { - response, - pool, + response: Some(self.0), err: None, cookies: jar, } @@ -299,15 +282,12 @@ impl HttpResponse { self.get_mut().write_capacity = cap; } - pub(crate) fn into_parts(mut self) -> HttpResponseParts { - self.0.take().unwrap().into_parts() + pub(crate) fn into_parts(self) -> HttpResponseParts { + self.0.into_parts() } pub(crate) fn from_parts(parts: HttpResponseParts) -> HttpResponse { - HttpResponse( - Some(Box::new(InnerHttpResponse::from_parts(parts))), - HttpResponsePool::pool(), - ) + HttpResponse(Box::new(InnerHttpResponse::from_parts(parts))) } } @@ -353,7 +333,6 @@ impl<'a> Iterator for CookieIter<'a> { /// builder-like pattern. pub struct HttpResponseBuilder { response: Option>, - pool: Option>>, err: Option, cookies: Option, } @@ -643,7 +622,7 @@ impl HttpResponseBuilder { } } response.body = body.into(); - HttpResponse(Some(response), self.pool.take().unwrap()) + HttpResponse(response) } #[inline] @@ -692,7 +671,6 @@ impl HttpResponseBuilder { pub fn take(&mut self) -> HttpResponseBuilder { HttpResponseBuilder { response: self.response.take(), - pool: self.pool.take(), err: self.err.take(), cookies: self.cookies.take(), } @@ -973,27 +951,28 @@ impl InnerHttpResponse { } /// Internal use only! -pub(crate) struct HttpResponsePool(VecDeque>); +pub(crate) struct HttpResponsePool(RefCell>>); -thread_local!(static POOL: Rc> = HttpResponsePool::pool()); +thread_local!(static POOL: &'static HttpResponsePool = HttpResponsePool::pool()); impl HttpResponsePool { - pub fn pool() -> Rc> { - Rc::new(UnsafeCell::new(HttpResponsePool(VecDeque::with_capacity( - 128, - )))) + fn pool() -> &'static HttpResponsePool { + let pool = HttpResponsePool(RefCell::new(VecDeque::with_capacity(128))); + Box::leak(Box::new(pool)) + } + + pub fn get_pool() -> &'static HttpResponsePool { + POOL.with(|p| *p) } #[inline] pub fn get_builder( - pool: &Rc>, status: StatusCode, + pool: &'static HttpResponsePool, status: StatusCode, ) -> HttpResponseBuilder { - let p = unsafe { &mut *pool.as_ref().get() }; - if let Some(mut msg) = p.0.pop_front() { + if let Some(mut msg) = pool.0.borrow_mut().pop_front() { msg.status = status; HttpResponseBuilder { response: Some(msg), - pool: Some(Rc::clone(pool)), err: None, cookies: None, } @@ -1001,7 +980,6 @@ impl HttpResponsePool { let msg = Box::new(InnerHttpResponse::new(status, Body::Empty)); HttpResponseBuilder { response: Some(msg), - pool: Some(Rc::clone(pool)), err: None, cookies: None, } @@ -1010,16 +988,15 @@ impl HttpResponsePool { #[inline] pub fn get_response( - pool: &Rc>, status: StatusCode, body: Body, + pool: &'static HttpResponsePool, status: StatusCode, body: Body, ) -> HttpResponse { - let p = unsafe { &mut *pool.as_ref().get() }; - if let Some(mut msg) = p.0.pop_front() { + if let Some(mut msg) = pool.0.borrow_mut().pop_front() { msg.status = status; msg.body = body; - HttpResponse(Some(msg), Rc::clone(pool)) + HttpResponse(msg) } else { let msg = Box::new(InnerHttpResponse::new(status, body)); - HttpResponse(Some(msg), Rc::clone(pool)) + HttpResponse(msg) } } @@ -1033,24 +1010,24 @@ impl HttpResponsePool { POOL.with(|pool| HttpResponsePool::get_response(pool, status, body)) } - #[inline(always)] - #[cfg_attr(feature = "cargo-clippy", allow(boxed_local, inline_always))] - fn release( - pool: &Rc>, mut inner: Box, - ) { - let pool = unsafe { &mut *pool.as_ref().get() }; - if pool.0.len() < 128 { - inner.headers.clear(); - inner.version = None; - inner.chunked = None; - inner.reason = None; - inner.encoding = None; - inner.connection_type = None; - inner.response_size = 0; - inner.error = None; - inner.write_capacity = MAX_WRITE_BUFFER_SIZE; - pool.0.push_front(inner); - } + #[inline] + pub(crate) fn release(resp: HttpResponse) { + let mut inner = resp.0; + POOL.with(|pool| { + let mut p = pool.0.borrow_mut(); + if p.len() < 128 { + inner.headers.clear(); + inner.version = None; + inner.chunked = None; + inner.reason = None; + inner.encoding = None; + inner.connection_type = None; + inner.response_size = 0; + inner.error = None; + inner.write_capacity = MAX_WRITE_BUFFER_SIZE; + p.push_front(inner); + } + }); } } diff --git a/src/pipeline.rs b/src/pipeline.rs index 87400e297..9f38f768d 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -13,7 +13,7 @@ use error::Error; use handler::{AsyncResult, AsyncResultItem}; use header::ContentEncoding; use httprequest::HttpRequest; -use httpresponse::HttpResponse; +use httpresponse::{HttpResponse, HttpResponsePool}; use middleware::{Finished, Middleware, Response, Started}; use server::{HttpHandlerTask, Writer, WriterState}; @@ -691,7 +691,7 @@ impl ProcessResponse { /// Middlewares start executor struct FinishingMiddlewares { - resp: HttpResponse, + resp: Option, fut: Option>>, _s: PhantomData, _h: PhantomData, @@ -703,10 +703,10 @@ impl FinishingMiddlewares { info: &mut PipelineInfo, mws: &[Box>], resp: HttpResponse, ) -> PipelineState { if info.count == 0 { - Completed::init(info) + Completed::init(info, resp) } else { let mut state = FinishingMiddlewares { - resp, + resp: Some(resp), fut: None, _s: PhantomData, _h: PhantomData, @@ -741,15 +741,16 @@ impl FinishingMiddlewares { } self.fut = None; if info.count == 0 { - return Some(Completed::init(info)); + return Some(Completed::init(info, self.resp.take().unwrap())); } info.count -= 1; - let state = mws[info.count as usize].finish(&mut info.req, &self.resp); + let state = mws[info.count as usize] + .finish(&mut info.req, self.resp.as_ref().unwrap()); match state { Finished::Done => { if info.count == 0 { - return Some(Completed::init(info)); + return Some(Completed::init(info, self.resp.take().unwrap())); } } Finished::Future(fut) => { @@ -761,19 +762,20 @@ impl FinishingMiddlewares { } #[derive(Debug)] -struct Completed(PhantomData, PhantomData); +struct Completed(PhantomData, PhantomData, Option); impl Completed { #[inline] - fn init(info: &mut PipelineInfo) -> PipelineState { + fn init(info: &mut PipelineInfo, resp: HttpResponse) -> PipelineState { if let Some(ref err) = info.error { error!("Error occurred during request handling: {}", err); } if info.context.is_none() { + HttpResponsePool::release(resp); PipelineState::None } else { - PipelineState::Completed(Completed(PhantomData, PhantomData)) + PipelineState::Completed(Completed(PhantomData, PhantomData, Some(resp))) } } @@ -781,8 +783,14 @@ impl Completed { fn poll(&mut self, info: &mut PipelineInfo) -> Option> { match info.poll_context() { Ok(Async::NotReady) => None, - Ok(Async::Ready(())) => Some(PipelineState::None), - Err(_) => Some(PipelineState::Error), + Ok(Async::Ready(())) => { + HttpResponsePool::release(self.2.take().unwrap()); + Some(PipelineState::None) + } + Err(_) => { + HttpResponsePool::release(self.2.take().unwrap()); + Some(PipelineState::Error) + } } } } @@ -793,6 +801,7 @@ mod tests { use actix::*; use context::HttpContext; use futures::future::{lazy, result}; + use http::StatusCode; use tokio::runtime::current_thread::Runtime; impl PipelineState { @@ -823,16 +832,18 @@ mod tests { .unwrap() .block_on(lazy(|| { let mut info = PipelineInfo::new(HttpRequest::default()); - Completed::<(), Inner<()>>::init(&mut info) + let resp = HttpResponse::new(StatusCode::OK); + Completed::<(), Inner<()>>::init(&mut info, resp) .is_none() .unwrap(); let req = HttpRequest::default(); let ctx = HttpContext::new(req.clone(), MyActor); let addr = ctx.address(); + let resp = HttpResponse::new(StatusCode::OK); let mut info = PipelineInfo::new(req); info.context = Some(Box::new(ctx)); - let mut state = Completed::<(), Inner<()>>::init(&mut info) + let mut state = Completed::<(), Inner<()>>::init(&mut info, resp) .completed() .unwrap(); diff --git a/src/server/settings.rs b/src/server/settings.rs index 4ec1cde21..31750b220 100644 --- a/src/server/settings.rs +++ b/src/server/settings.rs @@ -42,7 +42,7 @@ pub struct ServerSettings { secure: bool, host: String, cpu_pool: UnsafeCell>, - responses: Rc>, + responses: &'static HttpResponsePool, } impl Clone for ServerSettings { @@ -52,7 +52,7 @@ impl Clone for ServerSettings { secure: self.secure, host: self.host.clone(), cpu_pool: UnsafeCell::new(None), - responses: HttpResponsePool::pool(), + responses: HttpResponsePool::get_pool(), } } } @@ -63,7 +63,7 @@ impl Default for ServerSettings { addr: None, secure: false, host: "localhost:8080".to_owned(), - responses: HttpResponsePool::pool(), + responses: HttpResponsePool::get_pool(), cpu_pool: UnsafeCell::new(None), } } @@ -82,7 +82,7 @@ impl ServerSettings { "localhost".to_owned() }; let cpu_pool = UnsafeCell::new(None); - let responses = HttpResponsePool::pool(); + let responses = HttpResponsePool::get_pool(); ServerSettings { addr, secure, @@ -103,7 +103,7 @@ impl ServerSettings { host, secure, cpu_pool: UnsafeCell::new(None), - responses: HttpResponsePool::pool(), + responses: HttpResponsePool::get_pool(), } }