1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-29 04:51:13 +00:00

update actix; update examples

This commit is contained in:
Nikolay Kim 2017-10-21 15:21:16 -07:00
parent 0865071dd7
commit 4560ca17b2
10 changed files with 112 additions and 109 deletions

View file

@ -18,8 +18,13 @@ impl Route for MyRoute {
fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self> { fn request(req: HttpRequest, payload: Payload, ctx: &mut HttpContext<Self>) -> Reply<Self> {
println!("{:?}", req); println!("{:?}", req);
let multipart = match req.multipart(payload) {
Ok(mp) => mp,
Err(e) => return Reply::reply(e),
};
// get Multipart stream // get Multipart stream
WrapStream::<MyRoute>::actstream(req.multipart(payload)?) WrapStream::<MyRoute>::actstream(multipart)
.and_then(|item, act, ctx| { .and_then(|item, act, ctx| {
// Multipart stream is a stream of Fields and nested Multiparts // Multipart stream is a stream of Fields and nested Multiparts
match item { match item {

View file

@ -58,6 +58,11 @@ struct ChatClient;
struct ClientCommand(String); struct ClientCommand(String);
impl ResponseType for ClientCommand {
type Item = ();
type Error = ();
}
impl Actor for ChatClient { impl Actor for ChatClient {
type Context = FramedContext<Self>; type Context = FramedContext<Self>;
@ -112,11 +117,6 @@ impl Handler<ClientCommand> for ChatClient
} }
} }
impl ResponseType<ClientCommand> for ChatClient {
type Item = ();
type Error = ();
}
/// Server communication /// Server communication
impl FramedActor for ChatClient { impl FramedActor for ChatClient {
@ -134,11 +134,6 @@ impl StreamHandler<codec::ChatResponse, io::Error> for ChatClient {
} }
} }
impl ResponseType<codec::ChatResponse> for ChatClient {
type Item = ();
type Error = ();
}
impl Handler<codec::ChatResponse, io::Error> for ChatClient { impl Handler<codec::ChatResponse, io::Error> for ChatClient {
fn handle(&mut self, msg: codec::ChatResponse, _: &mut FramedContext<Self>) fn handle(&mut self, msg: codec::ChatResponse, _: &mut FramedContext<Self>)

View file

@ -4,7 +4,7 @@ use serde_json as json;
use byteorder::{BigEndian , ByteOrder}; use byteorder::{BigEndian , ByteOrder};
use bytes::{BytesMut, BufMut}; use bytes::{BytesMut, BufMut};
use tokio_io::codec::{Encoder, Decoder}; use tokio_io::codec::{Encoder, Decoder};
use actix::ResponseType;
/// Client request /// Client request
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -20,6 +20,11 @@ pub enum ChatRequest {
Ping Ping
} }
impl ResponseType for ChatRequest {
type Item = ();
type Error = ();
}
/// Server response /// Server response
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
#[serde(tag="cmd", content="data")] #[serde(tag="cmd", content="data")]
@ -36,6 +41,10 @@ pub enum ChatResponse {
Message(String), Message(String),
} }
impl ResponseType for ChatResponse {
type Item = ();
type Error = ();
}
/// Codec for Client -> Server transport /// Codec for Client -> Server transport
pub struct ChatCodec; pub struct ChatCodec;

View file

@ -2,6 +2,7 @@
extern crate rand; extern crate rand;
extern crate bytes; extern crate bytes;
extern crate byteorder; extern crate byteorder;
extern crate futures;
extern crate tokio_io; extern crate tokio_io;
extern crate tokio_core; extern crate tokio_core;
extern crate env_logger; extern crate env_logger;
@ -78,11 +79,6 @@ impl Handler<session::Message> for WsChatSession {
} }
} }
impl ResponseType<session::Message> for WsChatSession {
type Item = ();
type Error = ();
}
/// WebSocket message handler /// WebSocket message handler
impl Handler<ws::Message> for WsChatSession { impl Handler<ws::Message> for WsChatSession {
fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>) fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>)
@ -194,11 +190,6 @@ impl StreamHandler<ws::Message> for WsChatSession
} }
} }
impl ResponseType<ws::Message> for WsChatSession {
type Item = ();
type Error = ();
}
fn main() { fn main() {
let _ = env_logger::init(); let _ = env_logger::init();

View file

@ -16,11 +16,24 @@ pub struct Connect {
pub addr: Box<Subscriber<session::Message> + Send>, pub addr: Box<Subscriber<session::Message> + Send>,
} }
/// Response type for Connect message
///
/// Chat server returns unique session id
impl ResponseType for Connect {
type Item = usize;
type Error = ();
}
/// Session is disconnected /// Session is disconnected
pub struct Disconnect { pub struct Disconnect {
pub id: usize, pub id: usize,
} }
impl ResponseType for Disconnect {
type Item = ();
type Error = ();
}
/// Send message to specific room /// Send message to specific room
pub struct Message { pub struct Message {
/// Id of the client session /// Id of the client session
@ -31,9 +44,19 @@ pub struct Message {
pub room: String, pub room: String,
} }
impl ResponseType for Message {
type Item = ();
type Error = ();
}
/// List of available rooms /// List of available rooms
pub struct ListRooms; pub struct ListRooms;
impl ResponseType for ListRooms {
type Item = Vec<String>;
type Error = ();
}
/// Join room, if room does not exists create new one. /// Join room, if room does not exists create new one.
pub struct Join { pub struct Join {
/// Client id /// Client id
@ -42,6 +65,11 @@ pub struct Join {
pub name: String, pub name: String,
} }
impl ResponseType for Join {
type Item = ();
type Error = ();
}
/// `ChatServer` manages chat rooms and responsible for coordinating chat session. /// `ChatServer` manages chat rooms and responsible for coordinating chat session.
/// implementation is super primitive /// implementation is super primitive
pub struct ChatServer { pub struct ChatServer {
@ -109,15 +137,6 @@ impl Handler<Connect> for ChatServer {
} }
} }
impl ResponseType<Connect> for ChatServer {
/// Response type for Connect message
///
/// Chat server returns unique session id
type Item = usize;
type Error = ();
}
/// Handler for Disconnect message. /// Handler for Disconnect message.
impl Handler<Disconnect> for ChatServer { impl Handler<Disconnect> for ChatServer {
@ -144,11 +163,6 @@ impl Handler<Disconnect> for ChatServer {
} }
} }
impl ResponseType<Disconnect> for ChatServer {
type Item = ();
type Error = ();
}
/// Handler for Message message. /// Handler for Message message.
impl Handler<Message> for ChatServer { impl Handler<Message> for ChatServer {
@ -159,11 +173,6 @@ impl Handler<Message> for ChatServer {
} }
} }
impl ResponseType<Message> for ChatServer {
type Item = ();
type Error = ();
}
/// Handler for `ListRooms` message. /// Handler for `ListRooms` message.
impl Handler<ListRooms> for ChatServer { impl Handler<ListRooms> for ChatServer {
@ -178,11 +187,6 @@ impl Handler<ListRooms> for ChatServer {
} }
} }
impl ResponseType<ListRooms> for ChatServer {
type Item = Vec<String>;
type Error = ();
}
/// Join room, send disconnect message to old room /// Join room, send disconnect message to old room
/// send join message to new room /// send join message to new room
impl Handler<Join> for ChatServer { impl Handler<Join> for ChatServer {
@ -211,8 +215,3 @@ impl Handler<Join> for ChatServer {
Self::empty() Self::empty()
} }
} }
impl ResponseType<Join> for ChatServer {
type Item = ();
type Error = ();
}

View file

@ -3,6 +3,7 @@
use std::{io, net}; use std::{io, net};
use std::str::FromStr; use std::str::FromStr;
use std::time::{Instant, Duration}; use std::time::{Instant, Duration};
use futures::Stream;
use tokio_core::net::{TcpStream, TcpListener}; use tokio_core::net::{TcpStream, TcpListener};
use actix::*; use actix::*;
@ -14,6 +15,10 @@ use codec::{ChatRequest, ChatResponse, ChatCodec};
/// Chat server sends this messages to session /// Chat server sends this messages to session
pub struct Message(pub String); pub struct Message(pub String);
impl ResponseType for Message {
type Item = ();
type Error = ();
}
/// `ChatSession` actor is responsible for tcp peer communitions. /// `ChatSession` actor is responsible for tcp peer communitions.
pub struct ChatSession { pub struct ChatSession {
@ -68,11 +73,6 @@ impl StreamHandler<ChatRequest, io::Error> for ChatSession {
} }
} }
impl ResponseType<ChatRequest> for ChatSession {
type Item = ();
type Error = ();
}
impl Handler<ChatRequest, io::Error> for ChatSession { impl Handler<ChatRequest, io::Error> for ChatSession {
/// We'll stop chat session actor on any error, high likely it is just /// We'll stop chat session actor on any error, high likely it is just
@ -137,12 +137,6 @@ impl Handler<Message> for ChatSession {
} }
} }
impl ResponseType<Message> for ChatSession {
type Item = ();
type Error = ();
}
/// Helper methods /// Helper methods
impl ChatSession { impl ChatSession {
@ -194,7 +188,7 @@ impl TcpServer {
// So to be able to handle this events `Server` actor has to implement // So to be able to handle this events `Server` actor has to implement
// stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>` // stream handler `StreamHandler<(TcpStream, net::SocketAddr), io::Error>`
let _: () = TcpServer::create(|ctx| { let _: () = TcpServer::create(|ctx| {
ctx.add_stream(listener.incoming()); ctx.add_stream(listener.incoming().map(|(t, a)| TcpConnect(t, a)));
TcpServer{chat: chat} TcpServer{chat: chat}
}); });
} }
@ -206,18 +200,19 @@ impl Actor for TcpServer {
type Context = Context<Self>; type Context = Context<Self>;
} }
/// Handle stream of TcpStream's struct TcpConnect(TcpStream, net::SocketAddr);
impl StreamHandler<(TcpStream, net::SocketAddr), io::Error> for TcpServer {}
impl ResponseType<(TcpStream, net::SocketAddr)> for TcpServer { impl ResponseType for TcpConnect {
type Item = (); type Item = ();
type Error = (); type Error = ();
} }
impl Handler<(TcpStream, net::SocketAddr), io::Error> for TcpServer { /// Handle stream of TcpStream's
impl StreamHandler<TcpConnect, io::Error> for TcpServer {}
fn handle(&mut self, msg: (TcpStream, net::SocketAddr), _: &mut Context<Self>) impl Handler<TcpConnect, io::Error> for TcpServer {
-> Response<Self, (TcpStream, net::SocketAddr)>
fn handle(&mut self, msg: TcpConnect, _: &mut Context<Self>) -> Response<Self, TcpConnect>
{ {
// For each incoming connection we create `ChatSession` actor // For each incoming connection we create `ChatSession` actor
// with out chat server address. // with out chat server address.

View file

@ -30,12 +30,15 @@ impl Route for MyWebSocket {
} }
} }
impl ResponseType<ws::Message> for MyWebSocket { impl StreamHandler<ws::Message> for MyWebSocket {
type Item = (); fn started(&mut self, ctx: &mut Self::Context) {
type Error = (); println!("WebSocket session openned");
} }
impl StreamHandler<ws::Message> for MyWebSocket {} fn finished(&mut self, ctx: &mut Self::Context) {
println!("WebSocket session closed");
}
}
impl Handler<ws::Message> for MyWebSocket { impl Handler<ws::Message> for MyWebSocket {
fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>) fn handle(&mut self, msg: ws::Message, ctx: &mut HttpContext<Self>)

View file

@ -20,6 +20,7 @@ pub struct HttpContext<A> where A: Actor<Context=HttpContext<A>> + Route,
{ {
act: Option<A>, act: Option<A>,
state: ActorState, state: ActorState,
modified: bool,
items: ActorItemsCell<A>, items: ActorItemsCell<A>,
address: ActorAddressCell<A>, address: ActorAddressCell<A>,
stream: VecDeque<Frame>, stream: VecDeque<Frame>,
@ -57,16 +58,19 @@ impl<A> AsyncContext<A> for HttpContext<A> where A: Actor<Context=Self> + Route
fn spawn<F>(&mut self, fut: F) -> SpawnHandle fn spawn<F>(&mut self, fut: F) -> SpawnHandle
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{ {
self.modified = true;
self.items.spawn(fut) self.items.spawn(fut)
} }
fn wait<F>(&mut self, fut: F) fn wait<F>(&mut self, fut: F)
where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static where F: ActorFuture<Item=(), Error=(), Actor=A> + 'static
{ {
self.modified = true;
self.wait.add(fut); self.wait.add(fut);
} }
fn cancel_future(&mut self, handle: SpawnHandle) -> bool { fn cancel_future(&mut self, handle: SpawnHandle) -> bool {
self.modified = true;
self.items.cancel_future(handle) self.items.cancel_future(handle)
} }
} }
@ -85,6 +89,7 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
HttpContext { HttpContext {
act: None, act: None,
state: ActorState::Started, state: ActorState::Started,
modified: false,
items: ActorItemsCell::default(), items: ActorItemsCell::default(),
address: ActorAddressCell::default(), address: ActorAddressCell::default(),
wait: ActorWaitCell::default(), wait: ActorWaitCell::default(),
@ -124,17 +129,19 @@ impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
impl<A> HttpContext<A> where A: Actor<Context=Self> + Route { impl<A> HttpContext<A> where A: Actor<Context=Self> + Route {
#[doc(hidden)] #[doc(hidden)]
pub fn subscriber<M: 'static>(&mut self) -> Box<Subscriber<M>> pub fn subscriber<M>(&mut self) -> Box<Subscriber<M>>
where A: Handler<M> where A: Handler<M>,
M: ResponseType + 'static,
{ {
Box::new(self.address.unsync_address()) Box::new(self.address.unsync_address())
} }
#[doc(hidden)] #[doc(hidden)]
pub fn sync_subscriber<M: 'static + Send>(&mut self) -> Box<Subscriber<M> + Send> pub fn sync_subscriber<M>(&mut self) -> Box<Subscriber<M> + Send>
where A: Handler<M>, where A: Handler<M>,
A::Item: Send, M: ResponseType + Send + 'static,
A::Error: Send, M::Item: Send,
M::Error: Send,
{ {
Box::new(self.address.sync_address()) Box::new(self.address.sync_address())
} }
@ -170,28 +177,23 @@ impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
_ => () _ => ()
} }
// check wait futures
if self.wait.poll(act, ctx) {
return Ok(Async::NotReady)
}
let mut prep_stop = false; let mut prep_stop = false;
loop { loop {
let mut not_ready = true; self.modified = false;
if self.address.poll(act, ctx) {
not_ready = false
}
self.items.poll(act, ctx);
// check wait futures // check wait futures
if self.wait.poll(act, ctx) { if self.wait.poll(act, ctx) {
return Ok(Async::NotReady) return Ok(Async::NotReady)
} }
// incoming messages
self.address.poll(act, ctx);
// spawned futures and streams
self.items.poll(act, ctx);
// are we done // are we done
if !not_ready { if self.modified {
continue continue
} }
@ -239,15 +241,13 @@ impl<A> Stream for HttpContext<A> where A: Actor<Context=Self> + Route
} }
} }
type ToEnvelopeSender<A, M> =
Sender<Result<<A as ResponseType<M>>::Item, <A as ResponseType<M>>::Error>>;
impl<A, M> ToEnvelope<A, M> for HttpContext<A> impl<A, M> ToEnvelope<A, M> for HttpContext<A>
where M: Send + 'static, where A: Actor<Context=HttpContext<A>> + Route + Handler<M>,
A: Actor<Context=HttpContext<A>> + Route + Handler<M>, M: ResponseType + Send + 'static,
<A as ResponseType<M>>::Item: Send, <A as ResponseType<M>>::Item: Send M::Item: Send,
M::Error: Send,
{ {
fn pack(msg: M, tx: Option<ToEnvelopeSender<A, M>>) -> Envelope<A> fn pack(msg: M, tx: Option<Sender<Result<M::Item, M::Error>>>) -> Envelope<A>
{ {
RemoteEnvelope::new(msg, tx).into() RemoteEnvelope::new(msg, tx).into()
} }

View file

@ -46,7 +46,7 @@ impl<T, A> HttpServer<T, A>
S: Stream<Item=(T, A), Error=io::Error> + 'static S: Stream<Item=(T, A), Error=io::Error> + 'static
{ {
Ok(HttpServer::create(move |ctx| { Ok(HttpServer::create(move |ctx| {
ctx.add_stream(stream); ctx.add_stream(stream.map(|(t, a)| IoStream(t, a)));
self self
})) }))
} }
@ -81,7 +81,7 @@ impl HttpServer<TcpStream, net::SocketAddr> {
} else { } else {
Ok(HttpServer::create(move |ctx| { Ok(HttpServer::create(move |ctx| {
for tcp in addrs { for tcp in addrs {
ctx.add_stream(tcp.incoming()); ctx.add_stream(tcp.incoming().map(|(t, a)| IoStream(t, a)));
} }
self self
})) }))
@ -89,7 +89,9 @@ impl HttpServer<TcpStream, net::SocketAddr> {
} }
} }
impl<T, A> ResponseType<(T, A)> for HttpServer<T, A> struct IoStream<T, A>(T, A);
impl<T, A> ResponseType for IoStream<T, A>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
A: 'static A: 'static
{ {
@ -97,16 +99,15 @@ impl<T, A> ResponseType<(T, A)> for HttpServer<T, A>
type Error = (); type Error = ();
} }
impl<T, A> StreamHandler<(T, A), io::Error> for HttpServer<T, A> impl<T, A> StreamHandler<IoStream<T, A>, io::Error> for HttpServer<T, A>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static, A: 'static {}
A: 'static {
}
impl<T, A> Handler<(T, A), io::Error> for HttpServer<T, A> impl<T, A> Handler<IoStream<T, A>, io::Error> for HttpServer<T, A>
where T: AsyncRead + AsyncWrite + 'static, where T: AsyncRead + AsyncWrite + 'static,
A: 'static A: 'static
{ {
fn handle(&mut self, msg: (T, A), _: &mut Context<Self>) -> Response<Self, (T, A)> fn handle(&mut self, msg: IoStream<T, A>, _: &mut Context<Self>)
-> Response<Self, IoStream<T, A>>
{ {
Arbiter::handle().spawn( Arbiter::handle().spawn(
HttpChannel{router: Rc::clone(&self.router), HttpChannel{router: Rc::clone(&self.router),

View file

@ -69,7 +69,7 @@ use http::{Method, StatusCode, header};
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use futures::{Async, Poll, Stream}; use futures::{Async, Poll, Stream};
use actix::Actor; use actix::{Actor, ResponseType};
use context::HttpContext; use context::HttpContext;
use route::Route; use route::Route;
@ -103,6 +103,11 @@ pub enum Message {
Error Error
} }
impl ResponseType for Message {
type Item = ();
type Error = ();
}
/// Prepare `WebSocket` handshake response. /// Prepare `WebSocket` handshake response.
/// ///
/// This function returns handshake `HttpResponse`, ready to send to peer. /// This function returns handshake `HttpResponse`, ready to send to peer.