1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-11-29 21:11:17 +00:00

Fix server websockets big payloads support

This commit is contained in:
Nikolay Kim 2018-03-19 17:27:03 -07:00
parent 35ee5d36d8
commit 6cd40df387
8 changed files with 178 additions and 154 deletions

View file

@ -12,9 +12,6 @@ matrix:
- rust: stable - rust: stable
- rust: beta - rust: beta
- rust: nightly - rust: nightly
allow_failures:
- rust: nightly
- rust: beta
#rust: #rust:
# - 1.21.0 # - 1.21.0

View file

@ -8,6 +8,8 @@
* Allow to set client websocket handshake timeout * Allow to set client websocket handshake timeout
* Fix server websockets big payloads support
## 0.4.9 (2018-03-16) ## 0.4.9 (2018-03-16)

View file

@ -145,9 +145,7 @@ impl HttpResponseParser {
// convert headers // convert headers
let mut hdrs = HeaderMap::new(); let mut hdrs = HeaderMap::new();
for header in headers[..headers_len].iter() { for header in headers[..headers_len].iter() {
let n_start = header.name.as_ptr() as usize - bytes_ptr; if let Ok(name) = HeaderName::try_from(header.name) {
let n_end = n_start + header.name.len();
if let Ok(name) = HeaderName::try_from(slice.slice(n_start, n_end)) {
let v_start = header.value.as_ptr() as usize - bytes_ptr; let v_start = header.value.as_ptr() as usize - bytes_ptr;
let v_end = v_start + header.value.len(); let v_end = v_start + header.value.len();
let value = unsafe { let value = unsafe {

View file

@ -453,167 +453,171 @@ impl<S: 'static, H> ProcessResponse<S, H> {
fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>) fn poll_io(mut self, io: &mut Writer, info: &mut PipelineInfo<S>)
-> Result<PipelineState<S, H>, PipelineState<S, H>> -> Result<PipelineState<S, H>, PipelineState<S, H>>
{ {
if self.drain.is_none() && self.running != RunningState::Paused { loop {
// if task is paused, write buffer is probably full if self.drain.is_none() && self.running != RunningState::Paused {
'outter: loop { // if task is paused, write buffer is probably full
let result = match mem::replace(&mut self.iostate, IOState::Done) { 'inner: loop {
IOState::Response => { let result = match mem::replace(&mut self.iostate, IOState::Done) {
let encoding = self.resp.content_encoding().unwrap_or(info.encoding); IOState::Response => {
let encoding = self.resp.content_encoding().unwrap_or(info.encoding);
let result = match io.start(info.req_mut().get_inner(), let result = match io.start(info.req_mut().get_inner(),
&mut self.resp, encoding) &mut self.resp, encoding)
{ {
Ok(res) => res, Ok(res) => res,
Err(err) => { Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
}
};
if let Some(err) = self.resp.error() {
if self.resp.status().is_server_error() {
error!("Error occured during request handling: {}", err);
} else {
warn!("Error occured during request handling: {}", err);
}
if log_enabled!(Debug) {
debug!("{:?}", err);
}
}
// always poll stream or actor for the first time
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) => {
self.iostate = IOState::Payload(stream);
continue
},
Body::Actor(ctx) => {
self.iostate = IOState::Actor(ctx);
continue
},
_ => (),
}
result
},
IOState::Payload(mut body) => {
match body.poll() {
Ok(Async::Ready(None)) => {
if let Err(err) = io.write_eof() {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
} }
break };
if let Some(err) = self.resp.error() {
if self.resp.status().is_server_error() {
error!("Error occured during request handling: {}", err);
} else {
warn!("Error occured during request handling: {}", err);
}
if log_enabled!(Debug) {
debug!("{:?}", err);
}
}
// always poll stream or actor for the first time
match self.resp.replace_body(Body::Empty) {
Body::Streaming(stream) => {
self.iostate = IOState::Payload(stream);
continue 'inner
},
Body::Actor(ctx) => {
self.iostate = IOState::Actor(ctx);
continue 'inner
}, },
Ok(Async::Ready(Some(chunk))) => { _ => (),
self.iostate = IOState::Payload(body); }
match io.write(chunk.into()) {
Err(err) => { result
},
IOState::Payload(mut body) => {
match body.poll() {
Ok(Async::Ready(None)) => {
if let Err(err) = io.write_eof() {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp)) return Ok(FinishingMiddlewares::init(info, self.resp))
}, }
Ok(result) => result
}
}
Ok(Async::NotReady) => {
self.iostate = IOState::Payload(body);
break
},
Err(err) => {
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
},
IOState::Actor(mut ctx) => {
if info.disconnected.take().is_some() {
ctx.disconnected();
}
match ctx.poll() {
Ok(Async::Ready(Some(vec))) => {
if vec.is_empty() {
self.iostate = IOState::Actor(ctx);
break break
} },
let mut res = None; Ok(Async::Ready(Some(chunk))) => {
for frame in vec { self.iostate = IOState::Payload(body);
match frame { match io.write(chunk.into()) {
Frame::Chunk(None) => { Err(err) => {
info.context = Some(ctx); info.error = Some(err.into());
if let Err(err) = io.write_eof() { return Ok(FinishingMiddlewares::init(info, self.resp))
info.error = Some(err.into());
return Ok(
FinishingMiddlewares::init(info, self.resp))
}
break 'outter
}, },
Frame::Chunk(Some(chunk)) => { Ok(result) => result
match io.write(chunk) { }
Err(err) => { }
Ok(Async::NotReady) => {
self.iostate = IOState::Payload(body);
break
},
Err(err) => {
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
}
}
},
IOState::Actor(mut ctx) => {
if info.disconnected.take().is_some() {
ctx.disconnected();
}
match ctx.poll() {
Ok(Async::Ready(Some(vec))) => {
if vec.is_empty() {
self.iostate = IOState::Actor(ctx);
break
}
let mut res = None;
for frame in vec {
match frame {
Frame::Chunk(None) => {
info.context = Some(ctx);
if let Err(err) = io.write_eof() {
info.error = Some(err.into()); info.error = Some(err.into());
return Ok( return Ok(
FinishingMiddlewares::init(info, self.resp)) FinishingMiddlewares::init(info, self.resp))
}, }
Ok(result) => res = Some(result), break 'inner
} },
}, Frame::Chunk(Some(chunk)) => {
Frame::Drain(fut) => self.drain = Some(fut), match io.write(chunk) {
Err(err) => {
info.error = Some(err.into());
return Ok(
FinishingMiddlewares::init(info, self.resp))
},
Ok(result) => res = Some(result),
}
},
Frame::Drain(fut) => self.drain = Some(fut),
}
} }
self.iostate = IOState::Actor(ctx);
if self.drain.is_some() {
self.running.resume();
break 'inner
}
res.unwrap()
},
Ok(Async::Ready(None)) => {
break
} }
self.iostate = IOState::Actor(ctx); Ok(Async::NotReady) => {
if self.drain.is_some() { self.iostate = IOState::Actor(ctx);
self.running.resume(); break
break 'outter }
Err(err) => {
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
} }
res.unwrap()
},
Ok(Async::Ready(None)) => {
break
}
Ok(Async::NotReady) => {
self.iostate = IOState::Actor(ctx);
break
}
Err(err) => {
info.error = Some(err);
return Ok(FinishingMiddlewares::init(info, self.resp))
} }
} }
} IOState::Done => break,
IOState::Done => break, };
};
match result { match result {
WriterState::Pause => { WriterState::Pause => {
self.running.pause(); self.running.pause();
break break
}
WriterState::Done => {
self.running.resume()
},
} }
WriterState::Done => { }
self.running.resume() }
// flush io but only if we need to
if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed(false) {
Ok(Async::Ready(_)) => {
self.running.resume();
// resolve drain futures
if let Some(tx) = self.drain.take() {
let _ = tx.send(());
}
// restart io processing
continue
}, },
} Ok(Async::NotReady) =>
} return Err(PipelineState::Response(self)),
} Err(err) => {
info.error = Some(err.into());
// flush io but only if we need to return Ok(FinishingMiddlewares::init(info, self.resp))
if self.running == RunningState::Paused || self.drain.is_some() {
match io.poll_completed(false) {
Ok(Async::Ready(_)) => {
self.running.resume();
// resolve drain futures
if let Some(tx) = self.drain.take() {
let _ = tx.send(());
} }
// restart io processing
return self.poll_io(io, info);
},
Ok(Async::NotReady) => return Err(PipelineState::Response(self)),
Err(err) => {
info.error = Some(err.into());
return Ok(FinishingMiddlewares::init(info, self.resp))
} }
} }
break
} }
// response is completed // response is completed

View file

@ -82,7 +82,9 @@ impl<T: AsyncWrite, H: 'static> H1Writer<T, H> {
self.disconnected(); self.disconnected();
return Err(io::Error::new(io::ErrorKind::WriteZero, "")) return Err(io::Error::new(io::ErrorKind::WriteZero, ""))
}, },
Ok(n) => written += n, Ok(n) => {
written += n;
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Ok(written) return Ok(written)
} }
@ -229,7 +231,7 @@ impl<T: AsyncWrite, H: 'static> Writer for H1Writer<T, H> {
if self.buffer.is_empty() { if self.buffer.is_empty() {
let pl: &[u8] = payload.as_ref(); let pl: &[u8] = payload.as_ref();
let n = self.write_data(pl)?; let n = self.write_data(pl)?;
if pl.len() < n { if n < pl.len() {
self.buffer.extend_from_slice(&pl[n..]); self.buffer.extend_from_slice(&pl[n..]);
return Ok(WriterState::Done); return Ok(WriterState::Done);
} }

View file

@ -454,13 +454,13 @@ impl Stream for ClientReader {
// read // read
match Frame::parse(&mut inner.rx, false, max_size) { match Frame::parse(&mut inner.rx, false, max_size) {
Ok(Async::Ready(Some(frame))) => { Ok(Async::Ready(Some(frame))) => {
let (finished, opcode, payload) = frame.unpack(); let (_finished, opcode, payload) = frame.unpack();
match opcode { match opcode {
// continuation is not supported // continuation is not supported
OpCode::Continue => { OpCode::Continue => {
inner.closed = true; inner.closed = true;
return Err(ProtocolError::NoContinuation) Err(ProtocolError::NoContinuation)
}, },
OpCode::Bad => { OpCode::Bad => {
inner.closed = true; inner.closed = true;

View file

@ -329,7 +329,8 @@ impl<S> Stream for WsStream<S> where S: Stream<Item=Bytes, Error=PayloadError> {
match String::from_utf8(tmp) { match String::from_utf8(tmp) {
Ok(s) => Ok(s) =>
Ok(Async::Ready(Some(Message::Text(s)))), Ok(Async::Ready(Some(Message::Text(s)))),
Err(_) => { Err(e) => {
println!("ENC: {:?}", e);
self.closed = true; self.closed = true;
Err(ProtocolError::BadEncoding) Err(ProtocolError::BadEncoding)
} }

View file

@ -93,7 +93,8 @@ fn test_large_bin() {
} }
struct Ws2 { struct Ws2 {
count: usize count: usize,
bin: bool,
} }
impl Actor for Ws2 { impl Actor for Ws2 {
@ -106,10 +107,14 @@ impl Actor for Ws2 {
impl Ws2 { impl Ws2 {
fn send(&mut self, ctx: &mut ws::WebsocketContext<Self>) { fn send(&mut self, ctx: &mut ws::WebsocketContext<Self>) {
ctx.text("0".repeat(65_536)); if self.bin {
ctx.binary(Vec::from("0".repeat(65_536)));
} else {
ctx.text("0".repeat(65_536));
}
ctx.drain().and_then(|_, act, ctx| { ctx.drain().and_then(|_, act, ctx| {
act.count += 1; act.count += 1;
if act.count != 100 { if act.count != 10_000 {
act.send(ctx); act.send(ctx);
} }
actix::fut::ok(()) actix::fut::ok(())
@ -135,10 +140,25 @@ fn test_server_send_text() {
let data = Some(ws::Message::Text("0".repeat(65_536))); let data = Some(ws::Message::Text("0".repeat(65_536)));
let mut srv = test::TestServer::new( let mut srv = test::TestServer::new(
|app| app.handler(|req| ws::start(req, Ws2{count:0}))); |app| app.handler(|req| ws::start(req, Ws2{count:0, bin: false})));
let (mut reader, _writer) = srv.ws().unwrap(); let (mut reader, _writer) = srv.ws().unwrap();
for _ in 0..100 { for _ in 0..10_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r;
assert_eq!(item, data);
}
}
#[test]
fn test_server_send_bin() {
let data = Some(ws::Message::Binary(Binary::from("0".repeat(65_536))));
let mut srv = test::TestServer::new(
|app| app.handler(|req| ws::start(req, Ws2{count:0, bin: true})));
let (mut reader, _writer) = srv.ws().unwrap();
for _ in 0..10_000 {
let (item, r) = srv.execute(reader.into_future()).unwrap(); let (item, r) = srv.execute(reader.into_future()).unwrap();
reader = r; reader = r;
assert_eq!(item, data); assert_eq!(item, data);