1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2025-01-04 14:28:50 +00:00

move blocking code to actix-rt

This commit is contained in:
Nikolay Kim 2019-03-11 23:11:51 -07:00
parent eae48f9612
commit a2c4639074
4 changed files with 4 additions and 103 deletions

View file

@ -64,7 +64,7 @@ actix-codec = "0.1.0"
actix-service = "0.3.3" actix-service = "0.3.3"
actix-utils = "0.3.3" actix-utils = "0.3.3"
actix-router = "0.1.0" actix-router = "0.1.0"
actix-rt = "0.2.0" actix-rt = "0.2.1"
actix-web-codegen = { path="actix-web-codegen" } actix-web-codegen = { path="actix-web-codegen" }
actix-http = { git = "https://github.com/actix/actix-http.git" } actix-http = { git = "https://github.com/actix/actix-http.git" }
@ -78,16 +78,13 @@ encoding = "0.2"
futures = "0.1" futures = "0.1"
hashbrown = "0.1.8" hashbrown = "0.1.8"
log = "0.4" log = "0.4"
lazy_static = "1.2"
mime = "0.3" mime = "0.3"
net2 = "0.2.33" net2 = "0.2.33"
num_cpus = "1.10"
parking_lot = "0.7" parking_lot = "0.7"
regex = "1.0" regex = "1.0"
serde = "1.0" serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
serde_urlencoded = "^0.5.3" serde_urlencoded = "^0.5.3"
threadpool = "1.7"
time = "0.1" time = "0.1"
url = { version="1.7", features=["query_encoding"] } url = { version="1.7", features=["query_encoding"] }

View file

@ -1,93 +0,0 @@
//! Thread pool for blocking operations
use std::fmt;
use derive_more::Display;
use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use parking_lot::Mutex;
use threadpool::ThreadPool;
use crate::ResponseError;
/// Env variable for default cpu pool size
const ENV_CPU_POOL_VAR: &str = "ACTIX_CPU_POOL";
lazy_static::lazy_static! {
pub(crate) static ref DEFAULT_POOL: Mutex<ThreadPool> = {
let default = match std::env::var(ENV_CPU_POOL_VAR) {
Ok(val) => {
if let Ok(val) = val.parse() {
val
} else {
log::error!("Can not parse ACTIX_CPU_POOL value");
num_cpus::get() * 5
}
}
Err(_) => num_cpus::get() * 5,
};
Mutex::new(
threadpool::Builder::new()
.thread_name("actix-web".to_owned())
.num_threads(default)
.build(),
)
};
}
thread_local! {
static POOL: ThreadPool = {
DEFAULT_POOL.lock().clone()
};
}
/// Blocking operation execution error
#[derive(Debug, Display)]
pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)]
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
/// Execute blocking function on a thread pool, returns future that resolves
/// to result of the function execution.
pub fn run<F, I, E>(f: F) -> CpuFuture<I, E>
where
F: FnOnce() -> Result<I, E> + Send + 'static,
I: Send + 'static,
E: Send + fmt::Debug + 'static,
{
let (tx, rx) = oneshot::channel();
POOL.with(|pool| {
pool.execute(move || {
if !tx.is_canceled() {
let _ = tx.send(f());
}
})
});
CpuFuture { rx }
}
/// Blocking operation completion future. It resolves with results
/// of blocking function execution.
pub struct CpuFuture<I, E> {
rx: oneshot::Receiver<Result<I, E>>,
}
impl<I, E: fmt::Debug> Future for CpuFuture<I, E> {
type Item = I;
type Error = BlockingError<E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let res =
futures::try_ready!(self.rx.poll().map_err(|_| BlockingError::Canceled));
match res {
Ok(val) => Ok(Async::Ready(val)),
Err(err) => Err(BlockingError::Error(err)),
}
}
}

View file

@ -4,8 +4,6 @@ pub use actix_http::error::*;
use derive_more::{Display, From}; use derive_more::{Display, From};
use url::ParseError as UrlParseError; use url::ParseError as UrlParseError;
pub use crate::blocking::BlockingError;
/// Errors which can occur when attempting to generate resource uri. /// Errors which can occur when attempting to generate resource uri.
#[derive(Debug, PartialEq, Display, From)] #[derive(Debug, PartialEq, Display, From)]
pub enum UrlGenerationError { pub enum UrlGenerationError {

View file

@ -2,7 +2,6 @@
mod app; mod app;
mod app_service; mod app_service;
mod blocking;
mod config; mod config;
pub mod error; pub mod error;
mod extract; mod extract;
@ -53,7 +52,6 @@ pub mod dev {
//! ``` //! ```
pub use crate::app::AppRouter; pub use crate::app::AppRouter;
pub use crate::blocking::CpuFuture;
pub use crate::config::{AppConfig, ServiceConfig}; pub use crate::config::{AppConfig, ServiceConfig};
pub use crate::info::ConnectionInfo; pub use crate::info::ConnectionInfo;
pub use crate::rmap::ResourceMap; pub use crate::rmap::ResourceMap;
@ -67,6 +65,7 @@ pub mod dev {
Extensions, Payload, PayloadStream, RequestHead, ResponseHead, Extensions, Payload, PayloadStream, RequestHead, ResponseHead,
}; };
pub use actix_router::{Path, ResourceDef, ResourcePath, Url}; pub use actix_router::{Path, ResourceDef, ResourcePath, Url};
pub use actix_rt::blocking::CpuFuture;
pub use actix_server::Server; pub use actix_server::Server;
pub(crate) fn insert_slash(path: &str) -> String { pub(crate) fn insert_slash(path: &str) -> String {
@ -80,12 +79,12 @@ pub mod dev {
pub mod web { pub mod web {
use actix_http::{http::Method, Response}; use actix_http::{http::Method, Response};
use actix_rt::blocking::{self, CpuFuture};
use futures::IntoFuture; use futures::IntoFuture;
pub use actix_http::Response as HttpResponse; pub use actix_http::Response as HttpResponse;
pub use bytes::{Bytes, BytesMut}; pub use bytes::{Bytes, BytesMut};
use crate::blocking::CpuFuture;
use crate::extract::FromRequest; use crate::extract::FromRequest;
use crate::handler::{AsyncFactory, Factory}; use crate::handler::{AsyncFactory, Factory};
use crate::resource::Resource; use crate::resource::Resource;
@ -258,6 +257,6 @@ pub mod web {
I: Send + 'static, I: Send + 'static,
E: Send + std::fmt::Debug + 'static, E: Send + std::fmt::Debug + 'static,
{ {
crate::blocking::run(f) blocking::run(f)
} }
} }