From 7ccacb92ce80aaf5716141d1e81a8f531a0c72fa Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Mon, 12 Feb 2018 17:42:10 -0800 Subject: [PATCH] update websocket-chat example --- examples/websocket-chat/src/client.rs | 2 +- examples/websocket-chat/src/main.rs | 36 ++++++++++++++------------ examples/websocket-chat/src/server.rs | 4 +-- examples/websocket-chat/src/session.rs | 33 ++++++++++++----------- 4 files changed, 40 insertions(+), 35 deletions(-) diff --git a/examples/websocket-chat/src/client.rs b/examples/websocket-chat/src/client.rs index 4fe18d8e9..d3b556b6f 100644 --- a/examples/websocket-chat/src/client.rs +++ b/examples/websocket-chat/src/client.rs @@ -29,7 +29,7 @@ fn main() { Arbiter::handle().spawn( TcpStream::connect(&addr, Arbiter::handle()) .and_then(|stream| { - let addr: Addr> = ChatClient::create(|ctx| { + let addr: Addr = ChatClient::create(|ctx| { let (r, w) = stream.split(); ChatClient::add_stream(FramedRead::new(r, codec::ClientChatCodec), ctx); ChatClient{ diff --git a/examples/websocket-chat/src/main.rs b/examples/websocket-chat/src/main.rs index 27957f69d..88e8590ef 100644 --- a/examples/websocket-chat/src/main.rs +++ b/examples/websocket-chat/src/main.rs @@ -26,7 +26,7 @@ mod session; /// This is our websocket route state, this state is shared with all route instances /// via `HttpContext::state()` struct WsChatSessionState { - addr: Addr>, + addr: Addr, } /// Entry point for our route @@ -62,10 +62,10 @@ impl Actor for WsChatSession { // before processing any other events. // HttpContext::state() is instance of WsChatSessionState, state is shared across all // routes within application - let addr: Addr> = ctx.address(); - ctx.state().addr.call( - self, server::Connect{addr: addr.subscriber()}).then( - |res, act, ctx| { + let addr: Addr = ctx.address(); + ctx.state().addr.call(server::Connect{addr: addr.subscriber()}) + .into_actor(self) + .then(|res, act, ctx| { match res { Ok(res) => act.id = res, // something is wrong with chat server @@ -109,17 +109,19 @@ impl Handler for WsChatSession { "/list" => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - ctx.state().addr.call(self, server::ListRooms).then(|res, _, ctx| { - match res { - Ok(rooms) => { - for room in rooms { - ctx.text(room); - } - }, - _ => println!("Something is wrong"), - } - fut::ok(()) - }).wait(ctx) + ctx.state().addr.call(server::ListRooms) + .into_actor(self) + .then(|res, _, ctx| { + match res { + Ok(rooms) => { + for room in rooms { + ctx.text(room); + } + }, + _ => println!("Something is wrong"), + } + fut::ok(()) + }).wait(ctx) // .wait(ctx) pauses all events in context, // so actor wont receive any new messages until it get list // of rooms back @@ -172,7 +174,7 @@ fn main() { let sys = actix::System::new("websocket-example"); // Start chat server actor in separate thread - let server: Addr> = Arbiter::start(|_| server::ChatServer::default()); + let server: Addr = Arbiter::start(|_| server::ChatServer::default()); // Start tcp server in separate thread let srv = server.clone(); diff --git a/examples/websocket-chat/src/server.rs b/examples/websocket-chat/src/server.rs index b38fb4601..6cae978f6 100644 --- a/examples/websocket-chat/src/server.rs +++ b/examples/websocket-chat/src/server.rs @@ -15,7 +15,7 @@ use session; #[derive(Message)] #[rtype(usize)] pub struct Connect { - pub addr: SyncSubscriber, + pub addr: Subscriber, } /// Session is disconnected @@ -54,7 +54,7 @@ pub struct Join { /// `ChatServer` manages chat rooms and responsible for coordinating chat session. /// implementation is super primitive pub struct ChatServer { - sessions: HashMap>, + sessions: HashMap>, rooms: HashMap>, rng: RefCell, } diff --git a/examples/websocket-chat/src/session.rs b/examples/websocket-chat/src/session.rs index 50c2701ef..a5642db9f 100644 --- a/examples/websocket-chat/src/session.rs +++ b/examples/websocket-chat/src/session.rs @@ -24,7 +24,7 @@ pub struct ChatSession { /// unique session id id: usize, /// this is address of chat server - addr: Addr>, + addr: Addr, /// Client must send ping at least once per 10 seconds, otherwise we drop connection. hb: Instant, /// joined room @@ -45,8 +45,9 @@ impl Actor for ChatSession { // register self in chat server. `AsyncContext::wait` register // future within context, but context waits until this future resolves // before processing any other events. - let addr: Addr> = ctx.address(); - self.addr.call(self, server::Connect{addr: addr.subscriber()}) + let addr: Addr = ctx.address(); + self.addr.call(server::Connect{addr: addr.subscriber()}) + .into_actor(self) .then(|res, act, ctx| { match res { Ok(res) => act.id = res, @@ -75,15 +76,17 @@ impl StreamHandler for ChatSession { ChatRequest::List => { // Send ListRooms message to chat server and wait for response println!("List rooms"); - self.addr.call(self, server::ListRooms).then(|res, act, ctx| { - match res { - Ok(rooms) => { - act.framed.write(ChatResponse::Rooms(rooms)); - }, - _ => println!("Something is wrong"), - } - actix::fut::ok(()) - }).wait(ctx) + self.addr.call(server::ListRooms) + .into_actor(self) + .then(|res, act, ctx| { + match res { + Ok(rooms) => { + act.framed.write(ChatResponse::Rooms(rooms)); + }, + _ => println!("Something is wrong"), + } + actix::fut::ok(()) + }).wait(ctx) // .wait(ctx) pauses all events in context, // so actor wont receive any new messages until it get list of rooms back }, @@ -121,7 +124,7 @@ impl Handler for ChatSession { /// Helper methods impl ChatSession { - pub fn new(addr: Addr>, + pub fn new(addr: Addr, framed: actix::io::FramedWrite, ChatCodec>) -> ChatSession { ChatSession {id: 0, addr: addr, hb: Instant::now(), room: "Main".to_owned(), framed: framed} @@ -155,11 +158,11 @@ impl ChatSession { /// Define tcp server that will accept incoming tcp connection and create /// chat actors. pub struct TcpServer { - chat: Addr>, + chat: Addr, } impl TcpServer { - pub fn new(s: &str, chat: Addr>) { + pub fn new(s: &str, chat: Addr) { // Create server listener let addr = net::SocketAddr::from_str("127.0.0.1:12345").unwrap(); let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap();