use std; use std::marker::PhantomData; use std::collections::VecDeque; use futures::{Async, Future, Poll}; use futures::sync::oneshot::Sender; use futures::unsync::oneshot; use actix::{Actor, ActorState, ActorContext, AsyncContext, Handler, Subscriber, ResponseType, SpawnHandle}; use actix::fut::ActorFuture; use actix::dev::{AsyncContextApi, ActorAddressCell, ContextImpl, Envelope, ToEnvelope, RemoteEnvelope}; use body::{Body, Binary}; use error::{Error, Result, ErrorInternalServerError}; use httprequest::HttpRequest; pub trait ActorHttpContext: 'static { fn disconnected(&mut self); fn poll(&mut self) -> Poll, Error>; } #[derive(Debug)] pub enum Frame { Payload(Option), Drain(oneshot::Sender<()>), } /// Http actor execution context pub struct HttpContext where A: Actor>, { inner: ContextImpl, stream: VecDeque, request: HttpRequest, disconnected: bool, } impl ActorContext for HttpContext where A: Actor { /// Stop actor execution fn stop(&mut self) { self.stream.push_back(Frame::Payload(None)); self.inner.stop(); } /// Terminate actor execution fn terminate(&mut self) { self.inner.terminate() } /// Actor execution state fn state(&self) -> ActorState { self.inner.state() } } impl AsyncContext for HttpContext where A: Actor { fn spawn(&mut self, fut: F) -> SpawnHandle where F: ActorFuture + 'static { self.inner.spawn(fut) } fn wait(&mut self, fut: F) where F: ActorFuture + 'static { self.inner.wait(fut) } fn cancel_future(&mut self, handle: SpawnHandle) -> bool { self.inner.cancel_future(handle) } } #[doc(hidden)] impl AsyncContextApi for HttpContext where A: Actor { fn address_cell(&mut self) -> &mut ActorAddressCell { self.inner.address_cell() } } impl HttpContext where A: Actor { pub fn new(req: HttpRequest, actor: A) -> HttpContext { HttpContext::from_request(req).actor(actor) } pub fn from_request(req: HttpRequest) -> HttpContext { HttpContext { inner: ContextImpl::new(None), stream: VecDeque::new(), request: req, disconnected: false, } } pub fn actor(mut self, actor: A) -> HttpContext { self.inner.set_actor(actor); self } } impl HttpContext where A: Actor { /// Shared application state pub fn state(&self) -> &S { self.request.state() } /// Incoming request pub fn request(&mut self) -> &mut HttpRequest { &mut self.request } /// Write payload pub fn write>(&mut self, data: B) { if !self.disconnected { self.stream.push_back(Frame::Payload(Some(data.into()))); } else { warn!("Trying to write to disconnected response"); } } /// Indicate end of streamimng payload. Also this method calls `Self::close`. pub fn write_eof(&mut self) { self.stop(); } /// Returns drain future pub fn drain(&mut self) -> Drain { let (tx, rx) = oneshot::channel(); self.inner.modify(); self.stream.push_back(Frame::Drain(tx)); Drain::new(rx) } /// Check if connection still open pub fn connected(&self) -> bool { !self.disconnected } } impl HttpContext where A: Actor { #[doc(hidden)] pub fn subscriber(&mut self) -> Box> where A: Handler, M: ResponseType + 'static { self.inner.subscriber() } #[doc(hidden)] pub fn sync_subscriber(&mut self) -> Box + Send> where A: Handler, M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send, { self.inner.sync_subscriber() } } impl ActorHttpContext for HttpContext where A: Actor, S: 'static { fn disconnected(&mut self) { self.disconnected = true; self.stop(); } fn poll(&mut self) -> Poll, Error> { let ctx: &mut HttpContext = unsafe { std::mem::transmute(self as &mut HttpContext) }; match self.inner.poll(ctx) { Ok(Async::NotReady) => { // get frame if let Some(frame) = self.stream.pop_front() { Ok(Async::Ready(Some(frame))) } else { Ok(Async::NotReady) } } Ok(Async::Ready(())) => Ok(Async::Ready(None)), Err(_) => Err(ErrorInternalServerError("error").into()), } } } impl ToEnvelope for HttpContext where A: Actor>, { fn pack(msg: M, tx: Option>>, channel_on_drop: bool) -> Envelope where A: Handler, M: ResponseType + Send + 'static, M::Item: Send, M::Error: Send { RemoteEnvelope::new(msg, tx, channel_on_drop).into() } } impl From> for Body where A: Actor>, S: 'static { fn from(ctx: HttpContext) -> Body { Body::Actor(Box::new(ctx)) } } pub struct Drain { fut: oneshot::Receiver<()>, _a: PhantomData, } impl Drain { fn new(fut: oneshot::Receiver<()>) -> Self { Drain { fut: fut, _a: PhantomData } } } impl ActorFuture for Drain { type Item = (); type Error = (); type Actor = A; fn poll(&mut self, _: &mut A, _: &mut ::Context) -> Poll { self.fut.poll().map_err(|_| ()) } }