diff --git a/src/context.rs b/src/context.rs index 30d03e8ae..403052f1b 100644 --- a/src/context.rs +++ b/src/context.rs @@ -6,10 +6,10 @@ use futures::sync::oneshot::Sender; use futures::unsync::oneshot; use actix::{Actor, ActorState, ActorContext, AsyncContext, - Handler, Subscriber, ResponseType, SpawnHandle}; + Address, SyncAddress, Handler, Subscriber, ResponseType, SpawnHandle}; use actix::fut::ActorFuture; -use actix::dev::{AsyncContextApi, ActorAddressCell, - ContextImpl, Envelope, ToEnvelope, RemoteEnvelope}; +use actix::dev::{queue, AsyncContextApi, + ContextImpl, ContextProtocol, Envelope, ToEnvelope, RemoteEnvelope}; use body::{Body, Binary}; use error::{Error, Result, ErrorInternalServerError}; @@ -76,13 +76,25 @@ impl AsyncContext for HttpContext where A: Actor #[doc(hidden)] impl AsyncContextApi for HttpContext where A: Actor { - fn address_cell(&mut self) -> &mut ActorAddressCell { - self.inner.address_cell() + #[inline] + fn unsync_sender(&mut self) -> queue::unsync::UnboundedSender> { + self.inner.unsync_sender() + } + + #[inline] + fn unsync_address(&mut self) -> Address { + self.inner.unsync_address() + } + + #[inline] + fn sync_address(&mut self) -> SyncAddress { + self.inner.sync_address() } } impl HttpContext where A: Actor { + #[inline] pub fn new(req: HttpRequest, actor: A) -> HttpContext { HttpContext::from_request(req).actor(actor) } @@ -96,6 +108,7 @@ impl HttpContext where A: Actor { } } + #[inline] pub fn actor(mut self, actor: A) -> HttpContext { self.inner.set_actor(actor); self @@ -105,16 +118,19 @@ impl HttpContext where A: Actor { impl HttpContext where A: Actor { /// Shared application state + #[inline] pub fn state(&self) -> &S { self.request.state() } /// Incoming request + #[inline] pub fn request(&mut self) -> &mut HttpRequest { &mut self.request } /// Write payload + #[inline] pub fn write>(&mut self, data: B) { if !self.disconnected { self.stream.push_back(Frame::Payload(Some(data.into()))); @@ -124,6 +140,7 @@ impl HttpContext where A: Actor { } /// Indicate end of streamimng payload. Also this method calls `Self::close`. + #[inline] pub fn write_eof(&mut self) { self.stop(); } @@ -137,6 +154,7 @@ impl HttpContext where A: Actor { } /// Check if connection still open + #[inline] pub fn connected(&self) -> bool { !self.disconnected } @@ -144,6 +162,7 @@ impl HttpContext where A: Actor { impl HttpContext where A: Actor { + #[inline] #[doc(hidden)] pub fn subscriber(&mut self) -> Box> where A: Handler, M: ResponseType + 'static @@ -151,6 +170,7 @@ impl HttpContext where A: Actor { self.inner.subscriber() } + #[inline] #[doc(hidden)] pub fn sync_subscriber(&mut self) -> Box + Send> where A: Handler, @@ -162,6 +182,7 @@ impl HttpContext where A: Actor { impl ActorHttpContext for HttpContext where A: Actor, S: 'static { + #[inline] fn disconnected(&mut self) { self.disconnected = true; self.stop(); @@ -172,17 +193,20 @@ impl ActorHttpContext for HttpContext where A: Actor, std::mem::transmute(self as &mut HttpContext) }; - match self.inner.poll(ctx) { - Ok(Async::NotReady) => { - // get frame - if let Some(frame) = self.stream.pop_front() { - Ok(Async::Ready(Some(frame))) - } else { - Ok(Async::NotReady) - } + if self.inner.alive() { + match self.inner.poll(ctx) { + Ok(Async::NotReady) | Ok(Async::Ready(())) => (), + Err(_) => return Err(ErrorInternalServerError("error").into()), } - Ok(Async::Ready(())) => Ok(Async::Ready(None)), - Err(_) => Err(ErrorInternalServerError("error").into()), + } + + // frames + if let Some(frame) = self.stream.pop_front() { + Ok(Async::Ready(Some(frame))) + } else if self.inner.alive() { + Ok(Async::NotReady) + } else { + Ok(Async::Ready(None)) } } } @@ -190,6 +214,7 @@ impl ActorHttpContext for HttpContext where A: Actor, impl ToEnvelope for HttpContext where A: Actor>, { + #[inline] fn pack(msg: M, tx: Option>>, channel_on_drop: bool) -> Envelope where A: Handler, @@ -229,6 +254,7 @@ impl ActorFuture for Drain { type Error = (); type Actor = A; + #[inline] fn poll(&mut self, _: &mut A, _: &mut ::Context) -> Poll diff --git a/src/server.rs b/src/server.rs index 501445174..ded8c715b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -113,7 +113,13 @@ unsafe impl Sync for HttpServer where H: HttpHandler + ' unsafe impl Send for HttpServer where H: HttpHandler + 'static {} -impl Actor for HttpServer { +impl Actor for HttpServer + where A: 'static, + T: IoStream, + H: HttpHandler, + U: IntoIterator + 'static, + V: IntoHttpHandler, +{ type Context = Context; fn started(&mut self, ctx: &mut Self::Context) { @@ -121,13 +127,6 @@ impl Actor for Htt } } -impl HttpServer { - fn update_time(&self, ctx: &mut Context) { - helpers::update_date(); - ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); - } -} - impl HttpServer where A: 'static, T: IoStream, @@ -157,6 +156,11 @@ impl HttpServer } } + fn update_time(&self, ctx: &mut Context) { + helpers::update_date(); + ctx.run_later(Duration::new(1, 0), |slf, ctx| slf.update_time(ctx)); + } + /// Set number of workers to start. /// /// By default http server uses number of available logical cpu as threads count. @@ -294,14 +298,15 @@ impl HttpServer } // subscribe to os signals - fn subscribe_to_signals(&self, addr: &SyncAddress>) { - if self.no_signals { - let msg = signal::Subscribe(addr.subscriber()); + fn subscribe_to_signals(&self) -> Option> { + if !self.no_signals { if let Some(ref signals) = self.signals { - signals.send(msg); + Some(signals.clone()) } else { - Arbiter::system_registry().get::().send(msg); + Some(Arbiter::system_registry().get::()) } + } else { + None } } } @@ -355,10 +360,10 @@ impl HttpServer } // start http server actor - HttpServer::create(|ctx| { - self.subscribe_to_signals(&ctx.address()); - self - }) + let signals = self.subscribe_to_signals(); + let addr: SyncAddress<_> = Actor::start(self); + signals.map(|signals| signals.send(signal::Subscribe(addr.subscriber()))); + addr } } @@ -427,10 +432,10 @@ impl HttpServer, net::SocketAddr, H, } // start http server actor - Ok(HttpServer::create(|ctx| { - self.subscribe_to_signals(&ctx.address()); - self - })) + let signals = self.subscribe_to_signals(); + let addr: SyncAddress<_> = Actor::start(self); + signals.map(|signals| signals.send(signal::Subscribe(addr.subscriber()))); + Ok(addr) } } } @@ -470,10 +475,10 @@ impl HttpServer, net::SocketAddr, H, } // start http server actor - Ok(HttpServer::create(|ctx| { - self.subscribe_to_signals(&ctx.address()); - self - })) + let signals = self.subscribe_to_signals(); + let addr: SyncAddress<_> = Actor::start(self); + signals.map(|signals| signals.send(signal::Subscribe(addr.subscriber()))); + Ok(addr) } } } @@ -514,22 +519,25 @@ impl HttpServer, A, H, U> self.h = Some(Rc::new(WorkerSettings::new(apps, self.keep_alive))); // start server - HttpServer::create(move |ctx| { + let signals = self.subscribe_to_signals(); + let addr: SyncAddress<_> = HttpServer::create(move |ctx| { ctx.add_stream(stream.map( move |(t, _)| Conn{io: WrapperStream::new(t), peer: None, http2: false})); - self.subscribe_to_signals(&ctx.address()); self - }) + }); + signals.map(|signals| signals.send(signal::Subscribe(addr.subscriber()))); + addr } } /// Signals support /// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and send `SystemExit(0)` /// message to `System` actor. -impl Handler for HttpServer +impl Handler for HttpServer where T: IoStream, H: HttpHandler + 'static, - U: 'static, + U: IntoIterator + 'static, + V: IntoHttpHandler, A: 'static, { type Result = (); @@ -556,10 +564,11 @@ impl Handler for HttpServer } } -impl Handler>> for HttpServer +impl Handler>> for HttpServer where T: IoStream, H: HttpHandler + 'static, - U: 'static, + U: IntoIterator + 'static, + V: IntoHttpHandler, A: 'static, { type Result = (); @@ -595,10 +604,11 @@ pub struct StopServer { pub graceful: bool } -impl Handler for HttpServer +impl Handler for HttpServer where T: IoStream, H: HttpHandler + 'static, - U: 'static, + U: IntoIterator + 'static, + V: IntoHttpHandler, A: 'static, { type Result = (); @@ -612,10 +622,11 @@ impl Handler for HttpServer } } -impl Handler for HttpServer +impl Handler for HttpServer where T: IoStream, H: HttpHandler + 'static, - U: 'static, + U: IntoIterator + 'static, + V: IntoHttpHandler, A: 'static, { type Result = (); @@ -628,10 +639,11 @@ impl Handler for HttpServer } } -impl Handler for HttpServer +impl Handler for HttpServer where T: IoStream, H: HttpHandler + 'static, - U: 'static, + U: IntoIterator + 'static, + V: IntoHttpHandler, A: 'static, { type Result = actix::Response;