diff --git a/Cargo.toml b/Cargo.toml index c64f4bc68..e24392941 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ actix-codec = "0.1.0" actix-service = "0.3.3" actix-utils = "0.3.3" actix-router = "0.1.0" -actix-rt = "0.2.0" +actix-rt = "0.2.1" actix-web-codegen = { path="actix-web-codegen" } actix-http = { git = "https://github.com/actix/actix-http.git" } @@ -78,16 +78,13 @@ encoding = "0.2" futures = "0.1" hashbrown = "0.1.8" log = "0.4" -lazy_static = "1.2" mime = "0.3" net2 = "0.2.33" -num_cpus = "1.10" parking_lot = "0.7" regex = "1.0" serde = "1.0" serde_json = "1.0" serde_urlencoded = "^0.5.3" -threadpool = "1.7" time = "0.1" url = { version="1.7", features=["query_encoding"] } diff --git a/src/blocking.rs b/src/blocking.rs deleted file mode 100644 index fc9cec299..000000000 --- a/src/blocking.rs +++ /dev/null @@ -1,93 +0,0 @@ -//! Thread pool for blocking operations - -use std::fmt; - -use derive_more::Display; -use futures::sync::oneshot; -use futures::{Async, Future, Poll}; -use parking_lot::Mutex; -use threadpool::ThreadPool; - -use crate::ResponseError; - -/// Env variable for default cpu pool size -const ENV_CPU_POOL_VAR: &str = "ACTIX_CPU_POOL"; - -lazy_static::lazy_static! { - pub(crate) static ref DEFAULT_POOL: Mutex = { - let default = match std::env::var(ENV_CPU_POOL_VAR) { - Ok(val) => { - if let Ok(val) = val.parse() { - val - } else { - log::error!("Can not parse ACTIX_CPU_POOL value"); - num_cpus::get() * 5 - } - } - Err(_) => num_cpus::get() * 5, - }; - Mutex::new( - threadpool::Builder::new() - .thread_name("actix-web".to_owned()) - .num_threads(default) - .build(), - ) - }; -} - -thread_local! { - static POOL: ThreadPool = { - DEFAULT_POOL.lock().clone() - }; -} - -/// Blocking operation execution error -#[derive(Debug, Display)] -pub enum BlockingError { - #[display(fmt = "{:?}", _0)] - Error(E), - #[display(fmt = "Thread pool is gone")] - Canceled, -} - -impl ResponseError for BlockingError {} - -/// Execute blocking function on a thread pool, returns future that resolves -/// to result of the function execution. -pub fn run(f: F) -> CpuFuture -where - F: FnOnce() -> Result + Send + 'static, - I: Send + 'static, - E: Send + fmt::Debug + 'static, -{ - let (tx, rx) = oneshot::channel(); - POOL.with(|pool| { - pool.execute(move || { - if !tx.is_canceled() { - let _ = tx.send(f()); - } - }) - }); - - CpuFuture { rx } -} - -/// Blocking operation completion future. It resolves with results -/// of blocking function execution. -pub struct CpuFuture { - rx: oneshot::Receiver>, -} - -impl Future for CpuFuture { - type Item = I; - type Error = BlockingError; - - fn poll(&mut self) -> Poll { - let res = - futures::try_ready!(self.rx.poll().map_err(|_| BlockingError::Canceled)); - match res { - Ok(val) => Ok(Async::Ready(val)), - Err(err) => Err(BlockingError::Error(err)), - } - } -} diff --git a/src/error.rs b/src/error.rs index fd0ee998f..54ca74dc2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,8 +4,6 @@ pub use actix_http::error::*; use derive_more::{Display, From}; use url::ParseError as UrlParseError; -pub use crate::blocking::BlockingError; - /// Errors which can occur when attempting to generate resource uri. #[derive(Debug, PartialEq, Display, From)] pub enum UrlGenerationError { diff --git a/src/lib.rs b/src/lib.rs index 62f6399d6..0094818d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ mod app; mod app_service; -mod blocking; mod config; pub mod error; mod extract; @@ -53,7 +52,6 @@ pub mod dev { //! ``` pub use crate::app::AppRouter; - pub use crate::blocking::CpuFuture; pub use crate::config::{AppConfig, ServiceConfig}; pub use crate::info::ConnectionInfo; pub use crate::rmap::ResourceMap; @@ -67,6 +65,7 @@ pub mod dev { Extensions, Payload, PayloadStream, RequestHead, ResponseHead, }; pub use actix_router::{Path, ResourceDef, ResourcePath, Url}; + pub use actix_rt::blocking::CpuFuture; pub use actix_server::Server; pub(crate) fn insert_slash(path: &str) -> String { @@ -80,12 +79,12 @@ pub mod dev { pub mod web { use actix_http::{http::Method, Response}; + use actix_rt::blocking::{self, CpuFuture}; use futures::IntoFuture; pub use actix_http::Response as HttpResponse; pub use bytes::{Bytes, BytesMut}; - use crate::blocking::CpuFuture; use crate::extract::FromRequest; use crate::handler::{AsyncFactory, Factory}; use crate::resource::Resource; @@ -258,6 +257,6 @@ pub mod web { I: Send + 'static, E: Send + std::fmt::Debug + 'static, { - crate::blocking::run(f) + blocking::run(f) } }