use std; use std::rc::Rc; use std::cell::RefCell; use std::collections::VecDeque; use std::marker::PhantomData; use futures::{Async, Future, Poll}; use futures::sync::oneshot::Sender; use actix::{Actor, ActorState, ActorContext, AsyncContext, Handler, Subscriber, ResponseType}; use actix::fut::ActorFuture; use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, ActorWaitCell, SpawnHandle, Envelope, ToEnvelope, RemoteEnvelope}; use body::{Body, Binary}; use error::Error; use httprequest::HttpRequest; use httpresponse::HttpResponse; use pipeline::DrainFut; pub(crate) trait IoContext: 'static { fn disconnected(&mut self); fn poll(&mut self) -> Poll, Error>; } #[derive(Debug)] pub(crate) enum Frame { Message(HttpResponse), Payload(Option), Drain(Rc>), } /// Http actor execution context pub struct HttpContext where A: Actor>, { act: A, state: ActorState, modified: bool, items: ActorItemsCell, address: ActorAddressCell, stream: VecDeque, wait: ActorWaitCell, request: HttpRequest, streaming: bool, disconnected: bool, } impl ActorContext for HttpContext where A: Actor { /// Stop actor execution fn stop(&mut self) { self.stream.push_back(Frame::Payload(None)); self.items.stop(); self.address.close(); if self.state == ActorState::Running { self.state = ActorState::Stopping; } } /// Terminate actor execution fn terminate(&mut self) { self.address.close(); self.items.close(); self.state = ActorState::Stopped; } /// Actor execution state fn state(&self) -> ActorState { self.state } } impl AsyncContext for HttpContext where A: Actor { fn spawn(&mut self, fut: F) -> SpawnHandle where F: ActorFuture + 'static { self.modified = true; self.items.spawn(fut) } fn wait(&mut self, fut: F) where F: ActorFuture + 'static { self.modified = true; self.wait.add(fut); } fn cancel_future(&mut self, handle: SpawnHandle) -> bool { self.modified = true; self.items.cancel_future(handle) } fn cancel_future_on_stop(&mut self, handle: SpawnHandle) { self.items.cancel_future_on_stop(handle) } } #[doc(hidden)] impl AsyncContextApi for HttpContext where A: Actor { fn address_cell(&mut self) -> &mut ActorAddressCell { &mut self.address } } impl HttpContext where A: Actor { pub fn new(req: HttpRequest, actor: A) -> HttpContext { HttpContext { act: actor, state: ActorState::Started, modified: false, items: ActorItemsCell::default(), address: ActorAddressCell::default(), wait: ActorWaitCell::default(), stream: VecDeque::new(), request: req, streaming: false, disconnected: false, } } } impl HttpContext where A: Actor { /// Shared application state pub fn state(&self) -> &S { self.request.state() } /// Incoming request pub fn request(&mut self) -> &mut HttpRequest { &mut self.request } /// Send response to peer pub fn start>(&mut self, response: R) { let resp = response.into(); match *resp.body() { Body::StreamingContext | Body::UpgradeContext => self.streaming = true, _ => (), } self.stream.push_back(Frame::Message(resp)) } /// Write payload pub fn write>(&mut self, data: B) { if self.streaming { if !self.disconnected { self.stream.push_back(Frame::Payload(Some(data.into()))) } } else { warn!("Trying to write response body for non-streaming response"); } } /// Indicate end of streamimng payload. Also this method calls `Self::close`. pub fn write_eof(&mut self) { self.stop(); } /// Returns drain future pub fn drain(&mut self) -> Drain { let fut = Rc::new(RefCell::new(DrainFut::default())); self.stream.push_back(Frame::Drain(Rc::clone(&fut))); self.modified = true; Drain{ a: PhantomData, inner: fut } } /// Check if connection still open pub fn connected(&self) -> bool { !self.disconnected } } impl HttpContext where A: Actor { #[doc(hidden)] pub fn subscriber(&mut self) -> Box> where A: Handler, M: ResponseType + 'static, { Box::new(self.address.unsync_address()) } #[doc(hidden)] pub fn sync_subscriber(&mut self) -> Box + Send> where A: Handler, M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, { Box::new(self.address.sync_address()) } } impl IoContext for HttpContext where A: Actor, S: 'static { fn disconnected(&mut self) { self.items.stop(); self.disconnected = true; if self.state == ActorState::Running { self.state = ActorState::Stopping; } } fn poll(&mut self) -> Poll, Error> { let act: &mut A = unsafe { std::mem::transmute(&mut self.act as &mut A) }; let ctx: &mut HttpContext = unsafe { std::mem::transmute(self as &mut HttpContext) }; // update state match self.state { ActorState::Started => { Actor::started(act, ctx); self.state = ActorState::Running; }, ActorState::Stopping => { Actor::stopping(act, ctx); } _ => () } let mut prep_stop = false; loop { self.modified = false; // check wait futures if self.wait.poll(act, ctx) { // get frame if let Some(frame) = self.stream.pop_front() { return Ok(Async::Ready(Some(frame))) } return Ok(Async::NotReady) } // incoming messages self.address.poll(act, ctx); // spawned futures and streams self.items.poll(act, ctx); // are we done if self.modified { continue } // get frame if let Some(frame) = self.stream.pop_front() { return Ok(Async::Ready(Some(frame))) } // check state match self.state { ActorState::Stopped => { self.state = ActorState::Stopped; Actor::stopped(act, ctx); return Ok(Async::Ready(None)) }, ActorState::Stopping => { if prep_stop { if self.address.connected() || !self.items.is_empty() { self.state = ActorState::Running; continue } else { self.state = ActorState::Stopped; Actor::stopped(act, ctx); return Ok(Async::Ready(None)) } } else { Actor::stopping(act, ctx); prep_stop = true; continue } }, ActorState::Running => { if !self.address.connected() && self.items.is_empty() { self.state = ActorState::Stopping; Actor::stopping(act, ctx); prep_stop = true; continue } }, _ => (), } return Ok(Async::NotReady) } } } impl ToEnvelope for HttpContext where A: Actor>, { fn pack(msg: M, tx: Option>>) -> Envelope where A: Handler, M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send { RemoteEnvelope::new(msg, tx).into() } } pub struct Drain { a: PhantomData, inner: Rc> } impl ActorFuture for Drain where A: Actor { type Item = (); type Error = (); type Actor = A; fn poll(&mut self, _: &mut A, _: &mut ::Context) -> Poll<(), ()> { self.inner.borrow_mut().poll() } }