From 71da72efdbbc3c1607356e38a7cf0bcfd3ce6d8d Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 6 Jan 2018 22:59:39 -0800 Subject: [PATCH 1/2] use general context impl --- Cargo.toml | 3 +- src/context.rs | 152 +++++++++---------------------------------------- src/server.rs | 8 +-- src/worker.rs | 4 +- 4 files changed, 31 insertions(+), 136 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6d8310687..5f9f27734 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,8 @@ tokio-tls = { version="0.1", optional = true } tokio-openssl = { version="0.1", optional = true } [dependencies.actix] -version = "^0.4.1" +#version = "^0.4.2" +git = "https://github.com/actix/actix.git" [dependencies.openssl] version = "0.9" diff --git a/src/context.rs b/src/context.rs index d6261a3a0..30d03e8ae 100644 --- a/src/context.rs +++ b/src/context.rs @@ -8,11 +8,11 @@ use futures::unsync::oneshot; use actix::{Actor, ActorState, ActorContext, AsyncContext, Handler, Subscriber, ResponseType, SpawnHandle}; use actix::fut::ActorFuture; -use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, - Envelope, ToEnvelope, RemoteEnvelope}; +use actix::dev::{AsyncContextApi, ActorAddressCell, + ContextImpl, Envelope, ToEnvelope, RemoteEnvelope}; use body::{Body, Binary}; -use error::{Error, Result}; +use error::{Error, Result, ErrorInternalServerError}; use httprequest::HttpRequest; @@ -30,13 +30,8 @@ pub enum Frame { /// Http actor execution context pub struct HttpContext where A: Actor>, { - act: Option, - state: ActorState, - modified: bool, - items: ActorItemsCell, - address: ActorAddressCell, + inner: ContextImpl, stream: VecDeque, - wait: ActorWaitCell, request: HttpRequest, disconnected: bool, } @@ -46,23 +41,17 @@ impl ActorContext for HttpContext where A: Actor /// Stop actor execution fn stop(&mut self) { self.stream.push_back(Frame::Payload(None)); - self.items.stop(); - self.address.close(); - if self.state == ActorState::Running { - self.state = ActorState::Stopping; - } + self.inner.stop(); } /// Terminate actor execution fn terminate(&mut self) { - self.address.close(); - self.items.close(); - self.state = ActorState::Stopped; + self.inner.terminate() } /// Actor execution state fn state(&self) -> ActorState { - self.state + self.inner.state() } } @@ -71,31 +60,24 @@ impl AsyncContext for HttpContext where A: Actor fn spawn(&mut self, fut: F) -> SpawnHandle where F: ActorFuture + 'static { - self.modified = true; - self.items.spawn(fut) + self.inner.spawn(fut) } fn wait(&mut self, fut: F) where F: ActorFuture + 'static { - self.modified = true; - self.wait.add(fut); + self.inner.wait(fut) } fn cancel_future(&mut self, handle: SpawnHandle) -> bool { - self.modified = true; - self.items.cancel_future(handle) - } - - fn cancel_future_on_stop(&mut self, handle: SpawnHandle) { - self.items.cancel_future_on_stop(handle) + self.inner.cancel_future(handle) } } #[doc(hidden)] impl AsyncContextApi for HttpContext where A: Actor { fn address_cell(&mut self) -> &mut ActorAddressCell { - &mut self.address + self.inner.address_cell() } } @@ -107,12 +89,7 @@ impl HttpContext where A: Actor { pub fn from_request(req: HttpRequest) -> HttpContext { HttpContext { - act: None, - state: ActorState::Started, - modified: false, - items: ActorItemsCell::default(), - address: ActorAddressCell::default(), - wait: ActorWaitCell::default(), + inner: ContextImpl::new(None), stream: VecDeque::new(), request: req, disconnected: false, @@ -120,7 +97,7 @@ impl HttpContext where A: Actor { } pub fn actor(mut self, actor: A) -> HttpContext { - self.act = Some(actor); + self.inner.set_actor(actor); self } } @@ -154,7 +131,7 @@ impl HttpContext where A: Actor { /// Returns drain future pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); - self.modified = true; + self.inner.modify(); self.stream.push_back(Frame::Drain(tx)); Drain::new(rx) } @@ -169,120 +146,43 @@ impl HttpContext where A: Actor { #[doc(hidden)] pub fn subscriber(&mut self) -> Box> - where A: Handler, - M: ResponseType + 'static, + where A: Handler, M: ResponseType + 'static { - Box::new(self.address.unsync_address()) + self.inner.subscriber() } #[doc(hidden)] pub fn sync_subscriber(&mut self) -> Box + Send> where A: Handler, - M: ResponseType + Send + 'static, - M::Item: Send, - M::Error: Send, + M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, { - Box::new(self.address.sync_address()) + self.inner.sync_subscriber() } } impl ActorHttpContext for HttpContext where A: Actor, S: 'static { fn disconnected(&mut self) { - self.items.stop(); self.disconnected = true; - if self.state == ActorState::Running { - self.state = ActorState::Stopping; - } + self.stop(); } fn poll(&mut self) -> Poll, Error> { - if self.act.is_none() { - return Ok(Async::Ready(None)) - } - let act: &mut A = unsafe { - std::mem::transmute(self.act.as_mut().unwrap() as &mut A) - }; let ctx: &mut HttpContext = unsafe { std::mem::transmute(self as &mut HttpContext) }; - // update state - match self.state { - ActorState::Started => { - Actor::started(act, ctx); - self.state = ActorState::Running; - }, - ActorState::Stopping => { - Actor::stopping(act, ctx); - } - _ => () - } - - let mut prep_stop = false; - loop { - self.modified = false; - - // check wait futures - if self.wait.poll(act, ctx) { + match self.inner.poll(ctx) { + Ok(Async::NotReady) => { // get frame if let Some(frame) = self.stream.pop_front() { - return Ok(Async::Ready(Some(frame))) + Ok(Async::Ready(Some(frame))) + } else { + Ok(Async::NotReady) } - return Ok(Async::NotReady) } - - // incoming messages - self.address.poll(act, ctx); - - // spawned futures and streams - self.items.poll(act, ctx); - - // are we done - if self.modified { - continue - } - - // get frame - if let Some(frame) = self.stream.pop_front() { - return Ok(Async::Ready(Some(frame))) - } - - // check state - match self.state { - ActorState::Stopped => { - self.state = ActorState::Stopped; - Actor::stopped(act, ctx); - return Ok(Async::Ready(None)) - }, - ActorState::Stopping => { - if prep_stop { - if self.address.connected() || !self.items.is_empty() { - self.state = ActorState::Running; - continue - } else { - self.state = ActorState::Stopped; - Actor::stopped(act, ctx); - return Ok(Async::Ready(None)) - } - } else { - Actor::stopping(act, ctx); - prep_stop = true; - continue - } - }, - ActorState::Running => { - if !self.address.connected() && self.items.is_empty() { - self.state = ActorState::Stopping; - Actor::stopping(act, ctx); - prep_stop = true; - continue - } - }, - _ => (), - } - - return Ok(Async::NotReady) + Ok(Async::Ready(())) => Ok(Async::Ready(None)), + Err(_) => Err(ErrorInternalServerError("error").into()), } } } diff --git a/src/server.rs b/src/server.rs index a38bb2b1d..501445174 100644 --- a/src/server.rs +++ b/src/server.rs @@ -556,12 +556,6 @@ impl Handler for HttpServer } } -impl StreamHandler>> for HttpServer - where T: IoStream, - H: HttpHandler + 'static, - U: 'static, - A: 'static {} - impl Handler>> for HttpServer where T: IoStream, H: HttpHandler + 'static, @@ -682,7 +676,7 @@ impl Handler for HttpServer if self.exit { Arbiter::system().send(actix::msgs::SystemExit(0)) } - Self::empty() + Self::reply(Ok(())) } } } diff --git a/src/worker.rs b/src/worker.rs index bb8190fde..7b996a430 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -177,7 +177,7 @@ impl Handler for Worker let num = self.settings.channels.get(); if num == 0 { info!("Shutting down http worker, 0 connections"); - Self::reply(true) + Self::reply(Ok(true)) } else if let Some(dur) = msg.graceful { info!("Graceful http worker shutdown, {} connections", num); let (tx, rx) = oneshot::channel(); @@ -186,7 +186,7 @@ impl Handler for Worker } else { info!("Force shutdown http worker, {} connections", num); self.settings.head().traverse::(); - Self::reply(false) + Self::reply(Ok(false)) } } } From 896981cdf89e0bf23a98f100eb3c94bbdfd6adc7 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 6 Jan 2018 23:22:10 -0800 Subject: [PATCH 2/2] update examples --- examples/websocket-chat/Cargo.toml | 3 ++- examples/websocket-chat/src/client.rs | 4 +++- examples/websocket-chat/src/main.rs | 3 ++- examples/websocket-chat/src/session.rs | 4 ++-- examples/websocket/Cargo.toml | 3 ++- 5 files changed, 11 insertions(+), 6 deletions(-) diff --git a/examples/websocket-chat/Cargo.toml b/examples/websocket-chat/Cargo.toml index f5534f9be..a155e0e11 100644 --- a/examples/websocket-chat/Cargo.toml +++ b/examples/websocket-chat/Cargo.toml @@ -25,5 +25,6 @@ serde = "1.0" serde_json = "1.0" serde_derive = "1.0" -actix = "0.4" +#actix = "0.4" +actix = { git = "https://github.com/actix/actix" } actix-web = { git = "https://github.com/actix/actix-web" } diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index f1e52e051..c57825fc9 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -67,11 +67,13 @@ impl Actor for ChatClient { self.hb(ctx) } - fn stopping(&mut self, _: &mut FramedContext) { + fn stopping(&mut self, _: &mut FramedContext) -> bool { println!("Disconnected"); // Stop application on disconnect Arbiter::system().send(actix::msgs::SystemExit(0)); + + true } } diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 633c6556c..aec05ec74 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -75,9 +75,10 @@ impl Actor for WsChatSession { }).wait(ctx); } - fn stopping(&mut self, ctx: &mut Self::Context) { + fn stopping(&mut self, ctx: &mut Self::Context) -> bool { // notify chat server ctx.state().addr.send(server::Disconnect{id: self.id}); + true } } diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 47c0d0be4..b0725fde4 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -52,10 +52,10 @@ impl Actor for ChatSession { }).wait(ctx); } - fn stopping(&mut self, ctx: &mut Self::Context) { + fn stopping(&mut self, ctx: &mut Self::Context) -> bool { // notify chat server self.addr.send(server::Disconnect{id: self.id}); - ctx.stop() + true } } diff --git a/examples/websocket/Cargo.toml b/examples/websocket/Cargo.toml index 5c6bd9889..e75f5c390 100644 --- a/examples/websocket/Cargo.toml +++ b/examples/websocket/Cargo.toml @@ -10,5 +10,6 @@ path = "src/main.rs" [dependencies] env_logger = "*" futures = "0.1" -actix = "0.4" +#actix = "0.4" +actix = { git = "https://github.com/actix/actix.git" } actix-web = { git = "https://github.com/actix/actix-web.git" }