1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-03-10 13:31:10 +00:00
actix-web/src/server/services.rs

263 lines
6.4 KiB
Rust
Raw Normal View History

use std::net;
2018-09-27 03:40:45 +00:00
use std::time::Duration;
2018-08-19 17:47:04 +00:00
2018-09-27 03:40:45 +00:00
use futures::future::{err, ok, FutureResult};
2018-09-08 16:36:38 +00:00
use futures::{Future, Poll};
2018-09-27 03:40:45 +00:00
use tokio_current_thread::spawn;
2018-08-19 17:47:04 +00:00
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
2018-08-22 00:08:23 +00:00
use super::Token;
2018-09-27 03:40:45 +00:00
use counter::CounterGuard;
2018-09-11 16:30:22 +00:00
use service::{NewService, Service};
2018-08-19 17:47:04 +00:00
2018-09-27 03:40:45 +00:00
/// Server message
2018-09-07 20:06:51 +00:00
pub enum ServerMessage {
2018-09-28 00:05:48 +00:00
/// New stream
2018-09-07 20:06:51 +00:00
Connect(net::TcpStream),
2018-09-28 00:05:48 +00:00
/// Gracefull shutdown
2018-09-27 03:40:45 +00:00
Shutdown(Duration),
2018-09-28 00:05:48 +00:00
/// Force shutdown
2018-09-07 20:06:51 +00:00
ForceShutdown,
}
2018-09-27 03:40:45 +00:00
pub trait StreamServiceFactory: Send + Clone + 'static {
2018-09-28 19:22:49 +00:00
type NewService: NewService<Request = TcpStream, Response = ()>;
2018-09-27 03:40:45 +00:00
fn create(&self) -> Self::NewService;
}
pub trait ServiceFactory: Send + Clone + 'static {
2018-09-28 19:22:49 +00:00
type NewService: NewService<Request = ServerMessage, Response = ()>;
2018-09-27 03:40:45 +00:00
fn create(&self) -> Self::NewService;
}
pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str;
2018-09-27 03:40:45 +00:00
fn clone_factory(&self) -> Box<InternalServiceFactory>;
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>>;
2018-09-27 03:40:45 +00:00
}
2018-08-19 17:47:04 +00:00
pub(crate) type BoxedServerService = Box<
Service<
2018-09-27 03:40:45 +00:00
Request = (Option<CounterGuard>, ServerMessage),
2018-08-19 17:47:04 +00:00
Response = (),
Error = (),
2018-09-27 03:40:45 +00:00
Future = FutureResult<(), ()>,
2018-08-19 17:47:04 +00:00
>,
>;
2018-09-27 03:40:45 +00:00
pub(crate) struct StreamService<T> {
2018-09-07 18:35:25 +00:00
service: T,
}
2018-09-27 03:40:45 +00:00
impl<T> StreamService<T> {
pub(crate) fn new(service: T) -> Self {
2018-09-27 03:40:45 +00:00
StreamService { service }
2018-09-07 18:35:25 +00:00
}
2018-08-19 17:47:04 +00:00
}
2018-09-27 03:40:45 +00:00
impl<T> Service for StreamService<T>
2018-08-19 17:47:04 +00:00
where
2018-09-28 19:22:49 +00:00
T: Service<Request = TcpStream, Response = ()>,
2018-08-19 17:47:04 +00:00
T::Future: 'static,
T::Error: 'static,
2018-08-19 17:47:04 +00:00
{
2018-09-27 03:40:45 +00:00
type Request = (Option<CounterGuard>, ServerMessage);
2018-08-19 17:47:04 +00:00
type Response = ();
type Error = ();
2018-09-27 03:40:45 +00:00
type Future = FutureResult<(), ()>;
2018-08-19 17:47:04 +00:00
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
2018-09-08 16:36:38 +00:00
self.service.poll_ready().map_err(|_| ())
2018-08-19 17:47:04 +00:00
}
2018-09-27 03:40:45 +00:00
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
2018-09-07 20:06:51 +00:00
match req {
ServerMessage::Connect(stream) => {
let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e);
});
if let Ok(stream) = stream {
2018-09-28 19:22:49 +00:00
spawn(self.service.call(stream).map_err(|_| ()).map(move |val| {
2018-09-27 03:40:45 +00:00
drop(guard);
val
}));
ok(())
2018-09-07 20:06:51 +00:00
} else {
2018-09-27 03:40:45 +00:00
err(())
2018-09-07 20:06:51 +00:00
}
}
2018-09-27 03:40:45 +00:00
_ => ok(()),
2018-08-19 17:47:04 +00:00
}
}
}
2018-09-27 03:40:45 +00:00
pub(crate) struct ServerService<T> {
service: T,
}
impl<T> ServerService<T> {
fn new(service: T) -> Self {
ServerService { service }
}
}
impl<T> Service for ServerService<T>
where
2018-09-28 19:22:49 +00:00
T: Service<Request = ServerMessage, Response = ()>,
2018-09-27 03:40:45 +00:00
T::Future: 'static,
T::Error: 'static,
{
type Request = (Option<CounterGuard>, ServerMessage);
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(|_| ())
}
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
2018-09-28 19:22:49 +00:00
spawn(self.service.call(req).map_err(|_| ()).map(move |val| {
2018-09-27 03:40:45 +00:00
drop(guard);
val
}));
ok(())
}
}
pub(crate) struct ServiceNewService<F: ServiceFactory> {
2018-09-18 03:19:48 +00:00
name: String,
inner: F,
token: Token,
2018-08-19 17:47:04 +00:00
}
2018-09-27 03:40:45 +00:00
impl<F> ServiceNewService<F>
2018-08-19 17:47:04 +00:00
where
2018-09-27 03:40:45 +00:00
F: ServiceFactory,
2018-08-19 17:47:04 +00:00
{
pub(crate) fn create(name: String, token: Token, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, inner, token })
2018-08-19 17:47:04 +00:00
}
}
2018-09-27 03:40:45 +00:00
impl<F> InternalServiceFactory for ServiceNewService<F>
where
F: ServiceFactory,
{
fn name(&self, _: Token) -> &str {
2018-09-27 03:40:45 +00:00
&self.name
}
2018-09-18 03:19:48 +00:00
2018-09-27 03:40:45 +00:00
fn clone_factory(&self) -> Box<InternalServiceFactory> {
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
token: self.token,
2018-09-27 03:40:45 +00:00
})
}
2018-08-19 17:47:04 +00:00
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token;
2018-09-28 19:22:49 +00:00
Box::new(
self.inner
.create()
.new_service()
.map_err(|_| ())
.map(move |inner| {
let service: BoxedServerService = Box::new(ServerService::new(inner));
vec![(token, service)]
2018-09-28 19:22:49 +00:00
}),
)
2018-09-27 03:40:45 +00:00
}
}
pub(crate) struct StreamNewService<F: StreamServiceFactory> {
name: String,
inner: F,
token: Token,
2018-09-27 03:40:45 +00:00
}
impl<F> StreamNewService<F>
where
F: StreamServiceFactory,
{
pub(crate) fn create(name: String, token: Token, inner: F) -> Box<InternalServiceFactory> {
Box::new(Self { name, token, inner })
2018-09-27 03:40:45 +00:00
}
2018-08-19 17:47:04 +00:00
}
2018-09-27 03:40:45 +00:00
impl<F> InternalServiceFactory for StreamNewService<F>
2018-08-19 17:47:04 +00:00
where
2018-09-27 03:40:45 +00:00
F: StreamServiceFactory,
2018-08-19 17:47:04 +00:00
{
fn name(&self, _: Token) -> &str {
2018-09-18 03:19:48 +00:00
&self.name
}
2018-09-27 03:40:45 +00:00
fn clone_factory(&self) -> Box<InternalServiceFactory> {
2018-08-19 17:47:04 +00:00
Box::new(Self {
2018-09-18 03:19:48 +00:00
name: self.name.clone(),
2018-08-19 17:47:04 +00:00
inner: self.inner.clone(),
token: self.token,
2018-08-19 17:47:04 +00:00
})
}
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
let token = self.token;
2018-09-28 19:22:49 +00:00
Box::new(
self.inner
.create()
.new_service()
.map_err(|_| ())
.map(move |inner| {
let service: BoxedServerService = Box::new(StreamService::new(inner));
vec![(token, service)]
2018-09-28 19:22:49 +00:00
}),
)
2018-08-19 17:47:04 +00:00
}
}
2018-09-27 03:40:45 +00:00
impl InternalServiceFactory for Box<InternalServiceFactory> {
fn name(&self, token: Token) -> &str {
self.as_ref().name(token)
2018-09-18 03:19:48 +00:00
}
2018-09-27 03:40:45 +00:00
fn clone_factory(&self) -> Box<InternalServiceFactory> {
2018-08-19 17:47:04 +00:00
self.as_ref().clone_factory()
}
fn create(&self) -> Box<Future<Item = Vec<(Token, BoxedServerService)>, Error = ()>> {
2018-08-19 17:47:04 +00:00
self.as_ref().create()
}
}
2018-09-08 21:50:16 +00:00
2018-09-28 00:05:48 +00:00
impl<F, T> ServiceFactory for F
where
F: Fn() -> T + Send + Clone + 'static,
2018-09-28 19:22:49 +00:00
T: NewService<Request = ServerMessage, Response = ()>,
2018-09-28 00:05:48 +00:00
{
type NewService = T;
fn create(&self) -> T {
(self)()
}
}
2018-09-27 03:40:45 +00:00
impl<F, T> StreamServiceFactory for F
2018-09-08 21:50:16 +00:00
where
F: Fn() -> T + Send + Clone + 'static,
2018-09-28 19:22:49 +00:00
T: NewService<Request = TcpStream, Response = ()>,
2018-09-08 21:50:16 +00:00
{
type NewService = T;
fn create(&self) -> T {
(self)()
}
}