diff --git a/src/context.rs b/src/context.rs index d7dc2b30d..4b03c2b57 100644 --- a/src/context.rs +++ b/src/context.rs @@ -6,7 +6,7 @@ use futures::{Async, Stream, Poll}; use bytes::Bytes; use actix::{Actor, ActorState, ActorContext, AsyncActorContext}; use actix::fut::ActorFuture; -use actix::dev::{AsyncContextApi, ActorAddressCell}; +use actix::dev::{AsyncContextApi, ActorAddressCell, ActorItemsCell, SpawnHandle}; use route::{Route, Frame}; use httpmessage::HttpResponse; @@ -17,7 +17,7 @@ pub struct HttpContext where A: Actor> + Route, { act: Option, state: ActorState, - items: Vec>>, + items: ActorItemsCell, address: ActorAddressCell, stream: VecDeque, app_state: Rc<::State>, @@ -37,7 +37,7 @@ impl ActorContext for HttpContext where A: Actor + Route /// Terminate actor execution fn terminate(&mut self) { self.address.close(); - self.items.clear(); + self.items.close(); self.state = ActorState::Stopped; } @@ -49,14 +49,14 @@ impl ActorContext for HttpContext where A: Actor + Route impl AsyncActorContext for HttpContext where A: Actor + Route { - fn spawn(&mut self, fut: F) + fn spawn(&mut self, fut: F) -> SpawnHandle where F: ActorFuture + 'static { - if self.state == ActorState::Stopped { - error!("Context::spawn called for stopped actor."); - } else { - self.items.push(Box::new(fut)) - } + self.items.spawn(fut) + } + + fn cancel_future(&mut self, handle: SpawnHandle) -> bool { + self.items.cancel_future(handle) } } @@ -74,7 +74,7 @@ impl HttpContext where A: Actor + Route { HttpContext { act: None, state: ActorState::Started, - items: Vec::new(), + items: ActorItemsCell::default(), address: ActorAddressCell::default(), stream: VecDeque::new(), app_state: state, @@ -147,47 +147,7 @@ impl Stream for HttpContext where A: Actor + Route not_ready = false } - // check secondary streams - let mut idx = 0; - let mut len = self.items.len(); - loop { - if idx >= len { - break - } - - let (drop, item) = match self.items[idx].poll(act, ctx) { - Ok(val) => match val { - Async::Ready(_) => { - not_ready = false; - (true, None) - } - Async::NotReady => (false, None), - }, - Err(_) => (true, None) - }; - - // we have new pollable item - if let Some(item) = item { - self.items.push(item); - } - - // number of items could be different, context can add more items - len = self.items.len(); - - // item finishes, we need to remove it, - // replace current item with last item - if drop { - len -= 1; - if idx >= len { - self.items.pop(); - break - } else { - self.items[idx] = self.items.pop().unwrap(); - } - } else { - idx += 1; - } - } + self.items.poll(act, ctx); // are we done if !not_ready { diff --git a/src/wsframe.rs b/src/wsframe.rs index e87079ada..094c596d7 100644 --- a/src/wsframe.rs +++ b/src/wsframe.rs @@ -56,11 +56,6 @@ impl Frame { /// Create a new data frame. #[inline] pub fn message(data: Vec, code: OpCode, finished: bool) -> Frame { - debug_assert!(match code { - OpCode::Text | OpCode::Binary | OpCode::Continue => true, - _ => false, - }, "Invalid opcode for data frame."); - Frame { finished: finished, opcode: code,