1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-24 07:58:07 +00:00
actix-web/actix-http/src/ws/codec.rs

302 lines
9.9 KiB
Rust
Raw Normal View History

2018-12-11 02:08:33 +00:00
use actix_codec::{Decoder, Encoder};
use bitflags::bitflags;
use bytes::{Bytes, BytesMut};
use bytestring::ByteString;
2022-03-10 03:12:29 +00:00
use tracing::error;
2018-10-05 19:47:22 +00:00
2022-01-31 21:22:23 +00:00
use super::{
frame::Parser,
proto::{CloseReason, OpCode},
ProtocolError,
};
2018-10-05 19:47:22 +00:00
/// A WebSocket message.
2018-10-05 19:47:22 +00:00
#[derive(Debug, PartialEq)]
pub enum Message {
/// Text message.
Text(ByteString),
/// Binary message.
Binary(Bytes),
/// Continuation.
Continuation(Item),
/// Ping message.
2019-12-09 01:01:22 +00:00
Ping(Bytes),
/// Pong message.
2019-12-09 01:01:22 +00:00
Pong(Bytes),
/// Close message with optional reason.
2018-10-05 19:47:22 +00:00
Close(Option<CloseReason>),
/// No-op. Useful for low-level services.
Nop,
2018-10-05 19:47:22 +00:00
}
/// A WebSocket frame.
2018-10-10 20:20:00 +00:00
#[derive(Debug, PartialEq)]
pub enum Frame {
/// Text frame. Note that the codec does not validate UTF-8 encoding.
Text(Bytes),
/// Binary frame.
Binary(Bytes),
/// Continuation.
Continuation(Item),
/// Ping message.
2019-12-09 01:01:22 +00:00
Ping(Bytes),
/// Pong message.
2019-12-09 01:01:22 +00:00
Pong(Bytes),
/// Close message with optional reason.
2018-10-10 20:20:00 +00:00
Close(Option<CloseReason>),
}
2021-02-12 00:27:20 +00:00
/// A WebSocket continuation item.
#[derive(Debug, PartialEq)]
pub enum Item {
FirstText(Bytes),
FirstBinary(Bytes),
Continue(Bytes),
Last(Bytes),
}
/// WebSocket protocol codec.
#[derive(Debug, Clone)]
2018-10-05 19:47:22 +00:00
pub struct Codec {
flags: Flags,
2018-10-05 19:47:22 +00:00
max_size: usize,
}
bitflags! {
struct Flags: u8 {
const SERVER = 0b0000_0001;
const CONTINUATION = 0b0000_0010;
const W_CONTINUATION = 0b0000_0100;
}
2018-10-05 19:47:22 +00:00
}
impl Codec {
2021-02-12 00:27:20 +00:00
/// Create new WebSocket frames decoder.
pub const fn new() -> Codec {
2018-10-05 19:47:22 +00:00
Codec {
max_size: 65_536,
flags: Flags::SERVER,
2018-10-05 19:47:22 +00:00
}
}
/// Set max frame size.
2018-10-05 19:47:22 +00:00
///
/// By default max size is set to 64KiB.
#[must_use = "This returns the a new Codec, without modifying the original."]
2018-10-05 19:47:22 +00:00
pub fn max_size(mut self, size: usize) -> Self {
self.max_size = size;
self
}
/// Set decoder to client mode.
///
/// By default decoder works in server mode.
#[must_use = "This returns the a new Codec, without modifying the original."]
2018-10-05 19:47:22 +00:00
pub fn client_mode(mut self) -> Self {
self.flags.remove(Flags::SERVER);
2018-10-05 19:47:22 +00:00
self
}
}
impl Default for Codec {
fn default() -> Self {
Self::new()
}
}
impl Encoder<Message> for Codec {
2018-10-05 19:47:22 +00:00
type Error = ProtocolError;
fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> {
match item {
Message::Text(txt) => Parser::write_message(
dst,
txt,
OpCode::Text,
true,
!self.flags.contains(Flags::SERVER),
),
Message::Binary(bin) => Parser::write_message(
dst,
bin,
OpCode::Binary,
true,
!self.flags.contains(Flags::SERVER),
),
Message::Ping(txt) => Parser::write_message(
dst,
txt,
OpCode::Ping,
true,
!self.flags.contains(Flags::SERVER),
),
Message::Pong(txt) => Parser::write_message(
dst,
txt,
OpCode::Pong,
true,
!self.flags.contains(Flags::SERVER),
),
Message::Close(reason) => {
Parser::write_close(dst, reason, !self.flags.contains(Flags::SERVER))
2018-10-05 19:47:22 +00:00
}
Message::Continuation(cont) => match cont {
Item::FirstText(data) => {
if self.flags.contains(Flags::W_CONTINUATION) {
return Err(ProtocolError::ContinuationStarted);
} else {
self.flags.insert(Flags::W_CONTINUATION);
Parser::write_message(
dst,
&data[..],
OpCode::Text,
false,
!self.flags.contains(Flags::SERVER),
)
}
}
Item::FirstBinary(data) => {
if self.flags.contains(Flags::W_CONTINUATION) {
return Err(ProtocolError::ContinuationStarted);
} else {
self.flags.insert(Flags::W_CONTINUATION);
Parser::write_message(
dst,
&data[..],
OpCode::Binary,
false,
!self.flags.contains(Flags::SERVER),
)
}
}
Item::Continue(data) => {
if self.flags.contains(Flags::W_CONTINUATION) {
Parser::write_message(
dst,
&data[..],
OpCode::Continue,
false,
!self.flags.contains(Flags::SERVER),
)
} else {
return Err(ProtocolError::ContinuationNotStarted);
}
}
Item::Last(data) => {
if self.flags.contains(Flags::W_CONTINUATION) {
self.flags.remove(Flags::W_CONTINUATION);
Parser::write_message(
dst,
&data[..],
OpCode::Continue,
true,
!self.flags.contains(Flags::SERVER),
)
} else {
return Err(ProtocolError::ContinuationNotStarted);
}
}
},
2021-01-04 01:01:35 +00:00
Message::Nop => {}
2018-10-05 19:47:22 +00:00
}
Ok(())
}
}
impl Decoder for Codec {
2018-10-10 20:20:00 +00:00
type Item = Frame;
2018-10-05 19:47:22 +00:00
type Error = ProtocolError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match Parser::parse(src, self.flags.contains(Flags::SERVER), self.max_size) {
2018-10-05 19:47:22 +00:00
Ok(Some((finished, opcode, payload))) => {
// continuation is not supported
if !finished {
return match opcode {
OpCode::Continue => {
if self.flags.contains(Flags::CONTINUATION) {
Ok(Some(Frame::Continuation(Item::Continue(
2021-12-08 06:01:11 +00:00
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationNotStarted)
}
}
OpCode::Binary => {
if !self.flags.contains(Flags::CONTINUATION) {
self.flags.insert(Flags::CONTINUATION);
Ok(Some(Frame::Continuation(Item::FirstBinary(
2021-12-08 06:01:11 +00:00
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationStarted)
}
}
OpCode::Text => {
if !self.flags.contains(Flags::CONTINUATION) {
self.flags.insert(Flags::CONTINUATION);
Ok(Some(Frame::Continuation(Item::FirstText(
2021-12-08 06:01:11 +00:00
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationStarted)
}
}
_ => {
2022-03-10 03:12:29 +00:00
error!("Unfinished fragment {:?}", opcode);
Err(ProtocolError::ContinuationFragment(opcode))
}
};
2018-10-05 19:47:22 +00:00
}
match opcode {
OpCode::Continue => {
if self.flags.contains(Flags::CONTINUATION) {
self.flags.remove(Flags::CONTINUATION);
Ok(Some(Frame::Continuation(Item::Last(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))))
} else {
Err(ProtocolError::ContinuationNotStarted)
}
}
2018-10-05 19:47:22 +00:00
OpCode::Bad => Err(ProtocolError::BadOpCode),
OpCode::Close => {
2018-10-10 20:20:00 +00:00
if let Some(ref pl) = payload {
let close_reason = Parser::parse_close_payload(pl);
Ok(Some(Frame::Close(close_reason)))
} else {
Ok(Some(Frame::Close(None)))
}
2018-10-05 19:47:22 +00:00
}
OpCode::Ping => Ok(Some(Frame::Ping(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Pong => Ok(Some(Frame::Pong(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Binary => Ok(Some(Frame::Binary(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
OpCode::Text => Ok(Some(Frame::Text(
payload.map(|pl| pl.freeze()).unwrap_or_else(Bytes::new),
))),
2018-10-05 19:47:22 +00:00
}
}
Ok(None) => Ok(None),
Err(e) => Err(e),
}
}
}