1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-06-29 18:40:51 +00:00

Expose peer addr via Request::peer_addr() and RequestHead::peer_addr

This commit is contained in:
Nikolay Kim 2019-04-16 09:54:02 -07:00
parent 14252f5ef2
commit a116c4c2c7
13 changed files with 170 additions and 85 deletions

View file

@ -72,8 +72,8 @@ actix-router = "0.1.2"
actix-rt = "0.2.2" actix-rt = "0.2.2"
actix-web-codegen = "0.1.0-alpha.6" actix-web-codegen = "0.1.0-alpha.6"
actix-http = { version = "0.1.0-alpha.5", features=["fail"] } actix-http = { version = "0.1.0-alpha.5", features=["fail"] }
actix-server = "0.4.2" actix-server = "0.4.3"
actix-server-config = "0.1.0" actix-server-config = "0.1.1"
actix-threadpool = "0.1.0" actix-threadpool = "0.1.0"
awc = { version = "0.1.0-alpha.6", optional = true } awc = { version = "0.1.0-alpha.6", optional = true }
@ -81,7 +81,7 @@ bytes = "0.4"
derive_more = "0.14" derive_more = "0.14"
encoding = "0.2" encoding = "0.2"
futures = "0.1" futures = "0.1"
hashbrown = "0.2.1" hashbrown = "0.2.2"
log = "0.4" log = "0.4"
mime = "0.3" mime = "0.3"
net2 = "0.2.33" net2 = "0.2.33"

View file

@ -1,6 +1,10 @@
# Changes # Changes
## [0.1.0] - 2019-04-xx ## [0.1.0] - 2019-04-16
### Added
* Expose peer addr via `Request::peer_addr()` and `RequestHead::peer_addr`
### Changed ### Changed

View file

@ -1,6 +1,6 @@
[package] [package]
name = "actix-http" name = "actix-http"
version = "0.1.0-alpha.5" version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix http primitives" description = "Actix http primitives"
readme = "README.md" readme = "README.md"
@ -26,7 +26,7 @@ path = "src/lib.rs"
default = [] default = []
# openssl # openssl
ssl = ["openssl", "actix-connect/ssl"] ssl = ["openssl", "actix-connect/ssl", "actix-server-config/ssl"]
# brotli encoding, requires c compiler # brotli encoding, requires c compiler
brotli = ["brotli2"] brotli = ["brotli2"]
@ -48,7 +48,7 @@ actix-service = "0.3.6"
actix-codec = "0.1.2" actix-codec = "0.1.2"
actix-connect = "0.1.4" actix-connect = "0.1.4"
actix-utils = "0.3.5" actix-utils = "0.3.5"
actix-server-config = "0.1.0" actix-server-config = "0.1.1"
actix-threadpool = "0.1.0" actix-threadpool = "0.1.0"
base64 = "0.10" base64 = "0.10"
@ -60,7 +60,7 @@ derive_more = "0.14"
either = "1.5.2" either = "1.5.2"
encoding = "0.2" encoding = "0.2"
futures = "0.1" futures = "0.1"
hashbrown = "0.2.0" hashbrown = "0.2.2"
h2 = "0.1.16" h2 = "0.1.16"
http = "0.1.17" http = "0.1.17"
httparse = "1.3" httparse = "1.3"
@ -76,10 +76,10 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
sha1 = "0.6" sha1 = "0.6"
slab = "0.4" slab = "0.4"
serde_urlencoded = "0.5.3" serde_urlencoded = "0.5.5"
time = "0.1" time = "0.1"
tokio-tcp = "0.1.3" tokio-tcp = "0.1.3"
tokio-timer = "0.2" tokio-timer = "0.2.8"
tokio-current-thread = "0.1" tokio-current-thread = "0.1"
trust-dns-resolver = { version="0.11.0", default-features = false } trust-dns-resolver = { version="0.11.0", default-features = false }
@ -96,7 +96,7 @@ openssl = { version="0.10", optional = true }
[dev-dependencies] [dev-dependencies]
actix-rt = "0.2.2" actix-rt = "0.2.2"
actix-server = { version = "0.4.1", features=["ssl"] } actix-server = { version = "0.4.3", features=["ssl"] }
actix-connect = { version = "0.1.4", features=["ssl"] } actix-connect = { version = "0.1.4", features=["ssl"] }
actix-http-test = { version = "0.1.0-alpha.3", features=["ssl"] } actix-http-test = { version = "0.1.0-alpha.3", features=["ssl"] }
env_logger = "0.6" env_logger = "0.6"

View file

@ -1,6 +1,6 @@
#![allow(unused_imports, unused_variables, dead_code)] #![allow(unused_imports, unused_variables, dead_code)]
use std::fmt; use std::io::Write;
use std::io::{self, Write}; use std::{fmt, io, net};
use actix_codec::{Decoder, Encoder}; use actix_codec::{Decoder, Encoder};
use bitflags::bitflags; use bitflags::bitflags;
@ -40,7 +40,6 @@ pub struct Codec {
// encoder part // encoder part
flags: Flags, flags: Flags,
encoder: encoder::MessageEncoder<Response<()>>, encoder: encoder::MessageEncoder<Response<()>>,
// headers_size: u32,
} }
impl Default for Codec { impl Default for Codec {
@ -67,13 +66,11 @@ impl Codec {
}; };
Codec { Codec {
config, config,
flags,
decoder: decoder::MessageDecoder::default(), decoder: decoder::MessageDecoder::default(),
payload: None, payload: None,
version: Version::HTTP_11, version: Version::HTTP_11,
ctype: ConnectionType::Close, ctype: ConnectionType::Close,
flags,
// headers_size: 0,
encoder: encoder::MessageEncoder::default(), encoder: encoder::MessageEncoder::default(),
} }
} }

View file

@ -1,8 +1,9 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::time::Instant; use std::time::Instant;
use std::{fmt, io}; use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; use actix_codec::{Decoder, Encoder, Framed, FramedParts};
use actix_server_config::IoStream;
use actix_service::Service; use actix_service::Service;
use actix_utils::cloneable::CloneableService; use actix_utils::cloneable::CloneableService;
use bitflags::bitflags; use bitflags::bitflags;
@ -81,6 +82,7 @@ where
expect: CloneableService<X>, expect: CloneableService<X>,
upgrade: Option<CloneableService<U>>, upgrade: Option<CloneableService<U>>,
flags: Flags, flags: Flags,
peer_addr: Option<net::SocketAddr>,
error: Option<DispatchError>, error: Option<DispatchError>,
state: State<S, B, X>, state: State<S, B, X>,
@ -161,7 +163,7 @@ impl PartialEq for PollResponse {
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U> impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
@ -220,14 +222,15 @@ where
Dispatcher { Dispatcher {
inner: DispatcherState::Normal(InnerDispatcher { inner: DispatcherState::Normal(InnerDispatcher {
io,
codec,
read_buf,
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
payload: None, payload: None,
state: State::None, state: State::None,
error: None, error: None,
peer_addr: io.peer_addr(),
messages: VecDeque::new(), messages: VecDeque::new(),
io,
codec,
read_buf,
service, service,
expect, expect,
upgrade, upgrade,
@ -241,7 +244,7 @@ where
impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U> impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
@ -490,6 +493,7 @@ where
match msg { match msg {
Message::Item(mut req) => { Message::Item(mut req) => {
let pl = self.codec.message_type(); let pl = self.codec.message_type();
req.head_mut().peer_addr = self.peer_addr;
if pl == MessageType::Stream && self.upgrade.is_some() { if pl == MessageType::Stream && self.upgrade.is_some() {
self.messages.push_back(DispatcherMessage::Upgrade(req)); self.messages.push_back(DispatcherMessage::Upgrade(req));
@ -649,7 +653,7 @@ where
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U> impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,

View file

@ -1,8 +1,8 @@
use std::fmt; use std::fmt;
use std::marker::PhantomData; use std::marker::PhantomData;
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::Framed;
use actix_server_config::{Io, ServerConfig as SrvConfig}; use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig};
use actix_service::{IntoNewService, NewService, Service}; use actix_service::{IntoNewService, NewService, Service};
use actix_utils::cloneable::CloneableService; use actix_utils::cloneable::CloneableService;
use futures::future::{ok, FutureResult}; use futures::future::{ok, FutureResult};
@ -104,7 +104,7 @@ where
impl<T, P, S, B, X, U> NewService<SrvConfig> for H1Service<T, P, S, B, X, U> impl<T, P, S, B, X, U> NewService<SrvConfig> for H1Service<T, P, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: NewService<SrvConfig, Request = Request>, S: NewService<SrvConfig, Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
@ -161,7 +161,7 @@ where
impl<T, P, S, B, X, U> Future for H1ServiceResponse<T, P, S, B, X, U> impl<T, P, S, B, X, U> Future for H1ServiceResponse<T, P, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: NewService<SrvConfig, Request = Request>, S: NewService<SrvConfig, Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
@ -245,7 +245,7 @@ where
impl<T, P, S, B, X, U> Service for H1ServiceHandler<T, P, S, B, X, U> impl<T, P, S, B, X, U> Service for H1ServiceHandler<T, P, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
@ -309,7 +309,7 @@ pub struct OneRequest<T, P> {
impl<T, P> OneRequest<T, P> impl<T, P> OneRequest<T, P>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
{ {
/// Create new `H1SimpleService` instance. /// Create new `H1SimpleService` instance.
pub fn new() -> Self { pub fn new() -> Self {
@ -322,7 +322,7 @@ where
impl<T, P> NewService<SrvConfig> for OneRequest<T, P> impl<T, P> NewService<SrvConfig> for OneRequest<T, P>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
{ {
type Request = Io<T, P>; type Request = Io<T, P>;
type Response = (Request, Framed<T, Codec>); type Response = (Request, Framed<T, Codec>);
@ -348,7 +348,7 @@ pub struct OneRequestService<T, P> {
impl<T, P> Service for OneRequestService<T, P> impl<T, P> Service for OneRequestService<T, P>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
{ {
type Request = Io<T, P>; type Request = Io<T, P>;
type Response = (Request, Framed<T, Codec>); type Response = (Request, Framed<T, Codec>);
@ -372,14 +372,14 @@ where
#[doc(hidden)] #[doc(hidden)]
pub struct OneRequestServiceResponse<T> pub struct OneRequestServiceResponse<T>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
{ {
framed: Option<Framed<T, Codec>>, framed: Option<Framed<T, Codec>>,
} }
impl<T> Future for OneRequestServiceResponse<T> impl<T> Future for OneRequestServiceResponse<T>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
{ {
type Item = (Request, Framed<T, Codec>); type Item = (Request, Framed<T, Codec>);
type Error = ParseError; type Error = ParseError;

View file

@ -1,9 +1,10 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::time::Instant; use std::time::Instant;
use std::{fmt, mem}; use std::{fmt, mem, net};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_server_config::IoStream;
use actix_service::Service; use actix_service::Service;
use actix_utils::cloneable::CloneableService; use actix_utils::cloneable::CloneableService;
use bitflags::bitflags; use bitflags::bitflags;
@ -29,14 +30,11 @@ use crate::response::Response;
const CHUNK_SIZE: usize = 16_384; const CHUNK_SIZE: usize = 16_384;
/// Dispatcher for HTTP/2 protocol /// Dispatcher for HTTP/2 protocol
pub struct Dispatcher< pub struct Dispatcher<T: IoStream, S: Service<Request = Request>, B: MessageBody> {
T: AsyncRead + AsyncWrite,
S: Service<Request = Request>,
B: MessageBody,
> {
service: CloneableService<S>, service: CloneableService<S>,
connection: Connection<T, Bytes>, connection: Connection<T, Bytes>,
config: ServiceConfig, config: ServiceConfig,
peer_addr: Option<net::SocketAddr>,
ka_expire: Instant, ka_expire: Instant,
ka_timer: Option<Delay>, ka_timer: Option<Delay>,
_t: PhantomData<B>, _t: PhantomData<B>,
@ -44,7 +42,7 @@ pub struct Dispatcher<
impl<T, S, B> Dispatcher<T, S, B> impl<T, S, B> Dispatcher<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Future: 'static, S::Future: 'static,
@ -56,6 +54,7 @@ where
connection: Connection<T, Bytes>, connection: Connection<T, Bytes>,
config: ServiceConfig, config: ServiceConfig,
timeout: Option<Delay>, timeout: Option<Delay>,
peer_addr: Option<net::SocketAddr>,
) -> Self { ) -> Self {
// let keepalive = config.keep_alive_enabled(); // let keepalive = config.keep_alive_enabled();
// let flags = if keepalive { // let flags = if keepalive {
@ -76,9 +75,10 @@ where
Dispatcher { Dispatcher {
service, service,
config, config,
peer_addr,
connection,
ka_expire, ka_expire,
ka_timer, ka_timer,
connection,
_t: PhantomData, _t: PhantomData,
} }
} }
@ -86,7 +86,7 @@ where
impl<T, S, B> Future for Dispatcher<T, S, B> impl<T, S, B> Future for Dispatcher<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Future: 'static, S::Future: 'static,
@ -117,6 +117,7 @@ where
head.method = parts.method; head.method = parts.method;
head.version = parts.version; head.version = parts.version;
head.headers = parts.headers.into(); head.headers = parts.headers.into();
head.peer_addr = self.peer_addr;
tokio_current_thread::spawn(ServiceResponse::<S::Future, B> { tokio_current_thread::spawn(ServiceResponse::<S::Future, B> {
state: ServiceResponseState::ServiceCall( state: ServiceResponseState::ServiceCall(
self.service.call(req), self.service.call(req),

View file

@ -3,7 +3,7 @@ use std::marker::PhantomData;
use std::{io, net}; use std::{io, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_server_config::{Io, ServerConfig as SrvConfig}; use actix_server_config::{Io, IoStream, ServerConfig as SrvConfig};
use actix_service::{IntoNewService, NewService, Service}; use actix_service::{IntoNewService, NewService, Service};
use actix_utils::cloneable::CloneableService; use actix_utils::cloneable::CloneableService;
use bytes::Bytes; use bytes::Bytes;
@ -63,7 +63,7 @@ where
impl<T, P, S, B> NewService<SrvConfig> for H2Service<T, P, S, B> impl<T, P, S, B> NewService<SrvConfig> for H2Service<T, P, S, B>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: NewService<SrvConfig, Request = Request>, S: NewService<SrvConfig, Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
@ -95,7 +95,7 @@ pub struct H2ServiceResponse<T, P, S: NewService<SrvConfig, Request = Request>,
impl<T, P, S, B> Future for H2ServiceResponse<T, P, S, B> impl<T, P, S, B> Future for H2ServiceResponse<T, P, S, B>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: NewService<SrvConfig, Request = Request>, S: NewService<SrvConfig, Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
@ -140,7 +140,7 @@ where
impl<T, P, S, B> Service for H2ServiceHandler<T, P, S, B> impl<T, P, S, B> Service for H2ServiceHandler<T, P, S, B>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Future: 'static, S::Future: 'static,
@ -161,17 +161,20 @@ where
} }
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Self::Request) -> Self::Future {
let io = req.into_parts().0;
let peer_addr = io.peer_addr();
H2ServiceHandlerResponse { H2ServiceHandlerResponse {
state: State::Handshake( state: State::Handshake(
Some(self.srv.clone()), Some(self.srv.clone()),
Some(self.cfg.clone()), Some(self.cfg.clone()),
server::handshake(req.into_parts().0), peer_addr,
server::handshake(io),
), ),
} }
} }
} }
enum State<T: AsyncRead + AsyncWrite, S: Service<Request = Request>, B: MessageBody> enum State<T: IoStream, S: Service<Request = Request>, B: MessageBody>
where where
S::Future: 'static, S::Future: 'static,
{ {
@ -179,13 +182,14 @@ where
Handshake( Handshake(
Option<CloneableService<S>>, Option<CloneableService<S>>,
Option<ServiceConfig>, Option<ServiceConfig>,
Option<net::SocketAddr>,
Handshake<T, Bytes>, Handshake<T, Bytes>,
), ),
} }
pub struct H2ServiceHandlerResponse<T, S, B> pub struct H2ServiceHandlerResponse<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Future: 'static, S::Future: 'static,
@ -197,7 +201,7 @@ where
impl<T, S, B> Future for H2ServiceHandlerResponse<T, S, B> impl<T, S, B> Future for H2ServiceHandlerResponse<T, S, B>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Future: 'static, S::Future: 'static,
@ -210,14 +214,19 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.state { match self.state {
State::Incoming(ref mut disp) => disp.poll(), State::Incoming(ref mut disp) => disp.poll(),
State::Handshake(ref mut srv, ref mut config, ref mut handshake) => { State::Handshake(
match handshake.poll() { ref mut srv,
ref mut config,
ref peer_addr,
ref mut handshake,
) => match handshake.poll() {
Ok(Async::Ready(conn)) => { Ok(Async::Ready(conn)) => {
self.state = State::Incoming(Dispatcher::new( self.state = State::Incoming(Dispatcher::new(
srv.take().unwrap(), srv.take().unwrap(),
conn, conn,
config.take().unwrap(), config.take().unwrap(),
None, None,
peer_addr.clone(),
)); ));
self.poll() self.poll()
} }
@ -226,8 +235,7 @@ where
trace!("H2 handshake error: {}", err); trace!("H2 handshake error: {}", err);
Err(err.into()) Err(err.into())
} }
} },
}
} }
} }
} }

View file

@ -1,4 +1,5 @@
use std::cell::{Ref, RefCell, RefMut}; use std::cell::{Ref, RefCell, RefMut};
use std::net;
use std::rc::Rc; use std::rc::Rc;
use bitflags::bitflags; use bitflags::bitflags;
@ -43,6 +44,7 @@ pub struct RequestHead {
pub version: Version, pub version: Version,
pub headers: HeaderMap, pub headers: HeaderMap,
pub extensions: RefCell<Extensions>, pub extensions: RefCell<Extensions>,
pub peer_addr: Option<net::SocketAddr>,
flags: Flags, flags: Flags,
} }
@ -54,6 +56,7 @@ impl Default for RequestHead {
version: Version::HTTP_11, version: Version::HTTP_11,
headers: HeaderMap::with_capacity(16), headers: HeaderMap::with_capacity(16),
flags: Flags::empty(), flags: Flags::empty(),
peer_addr: None,
extensions: RefCell::new(Extensions::new()), extensions: RefCell::new(Extensions::new()),
} }
} }

View file

@ -1,5 +1,5 @@
use std::cell::{Ref, RefMut}; use std::cell::{Ref, RefMut};
use std::fmt; use std::{fmt, net};
use http::{header, Method, Uri, Version}; use http::{header, Method, Uri, Version};
@ -139,6 +139,7 @@ impl<P> Request<P> {
} }
/// Check if request requires connection upgrade /// Check if request requires connection upgrade
#[inline]
pub fn upgrade(&self) -> bool { pub fn upgrade(&self) -> bool {
if let Some(conn) = self.head().headers.get(header::CONNECTION) { if let Some(conn) = self.head().headers.get(header::CONNECTION) {
if let Ok(s) = conn.to_str() { if let Ok(s) = conn.to_str() {
@ -147,6 +148,15 @@ impl<P> Request<P> {
} }
self.head().method == Method::CONNECT self.head().method == Method::CONNECT
} }
/// Peer socket address
///
/// Peer address is actual socket address, if proxy is used in front of
/// actix http server, then peer address would be address of this proxy.
#[inline]
pub fn peer_addr(&self) -> Option<net::SocketAddr> {
self.head().peer_addr
}
} }
impl<P> fmt::Debug for Request<P> { impl<P> fmt::Debug for Request<P> {

View file

@ -1,8 +1,10 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::{fmt, io}; use std::{fmt, io, net};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_server_config::{Io as ServerIo, Protocol, ServerConfig as SrvConfig}; use actix_server_config::{
Io as ServerIo, IoStream, Protocol, ServerConfig as SrvConfig,
};
use actix_service::{IntoNewService, NewService, Service}; use actix_service::{IntoNewService, NewService, Service};
use actix_utils::cloneable::CloneableService; use actix_utils::cloneable::CloneableService;
use bytes::{Buf, BufMut, Bytes, BytesMut}; use bytes::{Buf, BufMut, Bytes, BytesMut};
@ -128,7 +130,7 @@ where
impl<T, P, S, B, X, U> NewService<SrvConfig> for HttpService<T, P, S, B, X, U> impl<T, P, S, B, X, U> NewService<SrvConfig> for HttpService<T, P, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: NewService<SrvConfig, Request = Request>, S: NewService<SrvConfig, Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
@ -182,7 +184,7 @@ pub struct HttpServiceResponse<
impl<T, P, S, B, X, U> Future for HttpServiceResponse<T, P, S, B, X, U> impl<T, P, S, B, X, U> Future for HttpServiceResponse<T, P, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: NewService<SrvConfig, Request = Request>, S: NewService<SrvConfig, Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
@ -268,7 +270,7 @@ where
impl<T, P, S, B, X, U> Service for HttpServiceHandler<T, P, S, B, X, U> impl<T, P, S, B, X, U> Service for HttpServiceHandler<T, P, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Future: 'static, S::Future: 'static,
@ -317,6 +319,7 @@ where
let (io, _, proto) = req.into_parts(); let (io, _, proto) = req.into_parts();
match proto { match proto {
Protocol::Http2 => { Protocol::Http2 => {
let peer_addr = io.peer_addr();
let io = Io { let io = Io {
inner: io, inner: io,
unread: None, unread: None,
@ -326,6 +329,7 @@ where
server::handshake(io), server::handshake(io),
self.cfg.clone(), self.cfg.clone(),
self.srv.clone(), self.srv.clone(),
peer_addr,
))), ))),
} }
} }
@ -357,7 +361,7 @@ where
S: Service<Request = Request>, S: Service<Request = Request>,
S::Future: 'static, S::Future: 'static,
S::Error: Into<Error>, S::Error: Into<Error>,
T: AsyncRead + AsyncWrite, T: IoStream,
B: MessageBody, B: MessageBody,
X: Service<Request = Request, Response = Request>, X: Service<Request = Request, Response = Request>,
X::Error: Into<Error>, X::Error: Into<Error>,
@ -376,12 +380,19 @@ where
Option<CloneableService<U>>, Option<CloneableService<U>>,
)>, )>,
), ),
Handshake(Option<(Handshake<Io<T>, Bytes>, ServiceConfig, CloneableService<S>)>), Handshake(
Option<(
Handshake<Io<T>, Bytes>,
ServiceConfig,
CloneableService<S>,
Option<net::SocketAddr>,
)>,
),
} }
pub struct HttpServiceHandlerResponse<T, S, B, X, U> pub struct HttpServiceHandlerResponse<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Future: 'static, S::Future: 'static,
@ -399,7 +410,7 @@ const HTTP2_PREFACE: [u8; 14] = *b"PRI * HTTP/2.0";
impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U> impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite, T: IoStream,
S: Service<Request = Request>, S: Service<Request = Request>,
S::Error: Into<Error>, S::Error: Into<Error>,
S::Future: 'static, S::Future: 'static,
@ -437,12 +448,17 @@ where
} }
let (io, buf, cfg, srv, expect, upgrade) = data.take().unwrap(); let (io, buf, cfg, srv, expect, upgrade) = data.take().unwrap();
if buf[..14] == HTTP2_PREFACE[..] { if buf[..14] == HTTP2_PREFACE[..] {
let peer_addr = io.peer_addr();
let io = Io { let io = Io {
inner: io, inner: io,
unread: Some(buf), unread: Some(buf),
}; };
self.state = self.state = State::Handshake(Some((
State::Handshake(Some((server::handshake(io), cfg, srv))); server::handshake(io),
cfg,
srv,
peer_addr,
)));
} else { } else {
self.state = State::H1(h1::Dispatcher::with_timeout( self.state = State::H1(h1::Dispatcher::with_timeout(
io, io,
@ -470,8 +486,8 @@ where
} else { } else {
panic!() panic!()
}; };
let (_, cfg, srv) = data.take().unwrap(); let (_, cfg, srv, peer_addr) = data.take().unwrap();
self.state = State::H2(Dispatcher::new(srv, conn, cfg, None)); self.state = State::H2(Dispatcher::new(srv, conn, cfg, None, peer_addr));
self.poll() self.poll()
} }
} }
@ -523,3 +539,25 @@ impl<T: AsyncWrite> AsyncWrite for Io<T> {
self.inner.write_buf(buf) self.inner.write_buf(buf)
} }
} }
impl<T: IoStream> IoStream for Io<T> {
#[inline]
fn peer_addr(&self) -> Option<net::SocketAddr> {
self.inner.peer_addr()
}
#[inline]
fn set_nodelay(&mut self, nodelay: bool) -> io::Result<()> {
self.inner.set_nodelay(nodelay)
}
#[inline]
fn set_linger(&mut self, dur: Option<std::time::Duration>) -> io::Result<()> {
self.inner.set_linger(dur)
}
#[inline]
fn set_keepalive(&mut self, dur: Option<std::time::Duration>) -> io::Result<()> {
self.inner.set_keepalive(dur)
}
}

View file

@ -4,6 +4,7 @@ use std::io;
use std::str::FromStr; use std::str::FromStr;
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_server_config::IoStream;
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use futures::{Async, Poll}; use futures::{Async, Poll};
use http::header::{self, HeaderName, HeaderValue}; use http::header::{self, HeaderName, HeaderValue};
@ -253,3 +254,17 @@ impl AsyncWrite for TestBuffer {
Ok(Async::NotReady) Ok(Async::NotReady)
} }
} }
impl IoStream for TestBuffer {
fn set_nodelay(&mut self, _nodelay: bool) -> io::Result<()> {
Ok(())
}
fn set_linger(&mut self, _dur: Option<std::time::Duration>) -> io::Result<()> {
Ok(())
}
fn set_keepalive(&mut self, _dur: Option<std::time::Duration>) -> io::Result<()> {
Ok(())
}
}

View file

@ -35,7 +35,10 @@ fn test_h1() {
.keep_alive(KeepAlive::Disabled) .keep_alive(KeepAlive::Disabled)
.client_timeout(1000) .client_timeout(1000)
.client_disconnect(1000) .client_disconnect(1000)
.h1(|_| future::ok::<_, ()>(Response::Ok().finish())) .h1(|req: Request| {
assert!(req.peer_addr().is_some());
future::ok::<_, ()>(Response::Ok().finish())
})
}); });
let response = srv.block_on(srv.get("/").send()).unwrap(); let response = srv.block_on(srv.get("/").send()).unwrap();
@ -50,6 +53,7 @@ fn test_h1_2() {
.client_timeout(1000) .client_timeout(1000)
.client_disconnect(1000) .client_disconnect(1000)
.finish(|req: Request| { .finish(|req: Request| {
assert!(req.peer_addr().is_some());
assert_eq!(req.version(), http::Version::HTTP_11); assert_eq!(req.version(), http::Version::HTTP_11);
future::ok::<_, ()>(Response::Ok().finish()) future::ok::<_, ()>(Response::Ok().finish())
}) })
@ -115,6 +119,7 @@ fn test_h2_1() -> std::io::Result<()> {
.and_then( .and_then(
HttpService::build() HttpService::build()
.finish(|req: Request| { .finish(|req: Request| {
assert!(req.peer_addr().is_some());
assert_eq!(req.version(), http::Version::HTTP_2); assert_eq!(req.version(), http::Version::HTTP_2);
future::ok::<_, Error>(Response::Ok().finish()) future::ok::<_, Error>(Response::Ok().finish())
}) })