1
0
Fork 0
mirror of https://github.com/actix/actix-web.git synced 2024-12-19 06:36:36 +00:00

Add synchronous adaptors for client side

Closes #269
This commit is contained in:
Douman 2018-06-08 12:19:57 +03:00
parent c8528e8920
commit 0896e32ddb
6 changed files with 293 additions and 0 deletions

View file

@ -77,6 +77,7 @@ time = "0.1"
encoding = "0.2"
language-tags = "0.2"
lazy_static = "1.0"
crossbeam-channel = "0.2"
url = { version="1.7", features=["query_encoding"] }
cookie = { version="0.10", features=["percent-encode"] }
brotli2 = { version="^0.3.2", optional = true }

View file

@ -31,6 +31,7 @@ mod pipeline;
mod request;
mod response;
mod writer;
pub mod sync;
pub use self::body::{ClientBody, ClientBodyStream};
pub use self::connector::{

View file

@ -250,6 +250,11 @@ impl ClientRequest {
send
}
}
/// Transforms request into synchronous.
pub fn sync(self) -> super::sync::ClientRequest {
super::sync::ClientRequest(self)
}
}
impl fmt::Debug for ClientRequest {

View file

@ -51,6 +51,14 @@ impl ClientResponse {
self.1 = Some(pl);
}
pub(crate) fn into_parts(self) -> (Rc<UnsafeCell<ClientMessage>>, Option<Box<Pipeline>>) {
(self.0, self.1)
}
pub(crate) fn from_parts(msg: ClientMessage, pipeline: Option<Box<Pipeline>>) -> Self {
ClientResponse(Rc::new(UnsafeCell::new(msg)), pipeline)
}
#[inline]
fn as_ref(&self) -> &ClientMessage {
unsafe { &*self.0.get() }

263
src/client/sync.rs Normal file
View file

@ -0,0 +1,263 @@
//! Sync version of client's HTTP primitives
extern crate serde;
extern crate futures;
extern crate bytes;
extern crate tokio;
extern crate http;
extern crate encoding;
extern crate mime;
extern crate serde_urlencoded;
extern crate serde_json;
extern crate crossbeam_channel;
use std::rc::Rc;
use std::io;
use std::thread;
use std::ops::{Deref, DerefMut};
use self::mime::Mime;
use self::encoding::all::UTF_8;
use self::encoding::label::encoding_from_whatwg_label;
use self::encoding::types::{DecoderTrap, Encoding};
use self::encoding::EncodingRef;
use self::http::{header, HeaderMap};
use self::bytes::Bytes;
use self::serde::de::DeserializeOwned;
use self::futures::{Future, Stream};
use self::futures::sync::{oneshot};
use super::response::ClientMessage;
use super::pipeline::Pipeline;
use ::httpmessage::HttpMessage;
use ::error::{JsonPayloadError, PayloadError, UrlencodedError, ContentTypeError};
use ::dev::{JsonBody, MessageBody, UrlEncoded};
use super::SendRequestError;
mod async {
pub use super::super::request::ClientRequest;
pub use super::super::response::ClientResponse;
}
/// An synchronous adaptor for body
pub trait SyncBody<T, E>: Future<Item=T, Error=E> {
/// Reads entire body synchronously.
///
/// Note: You should not attempt to read the whole future on the same thread
/// as actix event loop.
fn collect(self) -> Result<T, E> where Self: Sized {
self.wait()
}
}
impl<T> SyncBody<Bytes, PayloadError> for MessageBody<T>
where T: HttpMessage + Stream<Item=Bytes, Error=PayloadError> + 'static {}
impl<T, U: DeserializeOwned + 'static> SyncBody<U, UrlencodedError> for UrlEncoded<T, U>
where T: HttpMessage + Stream<Item=Bytes, Error=PayloadError> + 'static {}
impl<T, U: DeserializeOwned + 'static> SyncBody<U, JsonPayloadError> for JsonBody<T, U>
where T: HttpMessage + Stream<Item=Bytes, Error=PayloadError> + 'static {}
/// An synchronous HTTP Client Request
pub struct ClientRequest(pub async::ClientRequest);
impl Deref for ClientRequest {
type Target = async::ClientRequest;
fn deref(&self) -> &async::ClientRequest {
&self.0
}
}
impl DerefMut for ClientRequest {
fn deref_mut(&mut self) -> &mut async::ClientRequest {
&mut self.0
}
}
/// An synchronous HTTP Client Response
pub struct ClientResponse {
sender: ClientSender,
message: ClientMessage,
pipeline: Option<Box<Pipeline>>,
}
impl ClientResponse {
fn new(message: ClientMessage, pipeline: Option<Box<Pipeline>>, sender: ClientSender) -> Self {
Self {
sender,
message,
pipeline
}
}
///Transforms self into asynchronous response
pub fn into_async(self) -> async::ClientResponse {
async::ClientResponse::from_parts(self.message, self.pipeline)
}
/// Retrieves headers.
pub fn headers(&self) -> &HeaderMap {
&self.message.headers
}
/// Read the request content type. If request does not contain
/// *Content-Type* header, empty str get returned.
pub fn content_type(&self) -> &str {
if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
return content_type.split(';').next().unwrap().trim();
}
}
""
}
/// Convert the request content type to a known mime type.
fn mime_type(&self) -> Result<Option<Mime>, ContentTypeError> {
if let Some(content_type) = self.headers().get(header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
return match content_type.parse() {
Ok(mt) => Ok(Some(mt)),
Err(_) => Err(ContentTypeError::ParseError),
};
} else {
return Err(ContentTypeError::ParseError);
}
}
Ok(None)
}
/// Get content type encoding
///
/// UTF-8 is used by default, If request charset is not set.
fn encoding(&self) -> Result<EncodingRef, ContentTypeError> {
if let Some(mime_type) = self.mime_type()? {
if let Some(charset) = mime_type.get_param("charset") {
if let Some(enc) = encoding_from_whatwg_label(charset.as_str()) {
Ok(enc)
} else {
Err(ContentTypeError::UnknownEncoding)
}
} else {
Ok(UTF_8)
}
} else {
Ok(UTF_8)
}
}
///Synchronously receive response body as raw bytes.
pub fn sync_body(self) -> Result<Bytes, PayloadError> {
let (sender, receiver) = oneshot::channel();
self.sender.send((SyncJob::CollectBody(self.message, self.pipeline), sender));
match receiver.wait() {
Ok(rsp) => match rsp {
SyncJobResult::Body(result) => result,
_ => unreachable!()
},
Err(_canceled) => panic!("worker thread panicked!"),
}
}
///Synchronously receive response body as url encoded form
pub fn sync_urlencoded<T: DeserializeOwned>(self) -> Result<T, UrlencodedError> {
// check content type
if self.content_type().to_lowercase() != "application/x-www-form-urlencoded" {
return Err(UrlencodedError::ContentType);
}
let encoding = self.encoding().map_err(|_| UrlencodedError::ContentType)?;
let body = self.sync_body().map_err(|error| UrlencodedError::from(error))?;
let enc: *const Encoding = encoding as *const Encoding;
if enc == UTF_8 {
serde_urlencoded::from_bytes::<T>(&body).map_err(|_| UrlencodedError::Parse)
} else {
let body = encoding .decode(&body, DecoderTrap::Strict) .map_err(|_| UrlencodedError::Parse)?;
serde_urlencoded::from_str::<T>(&body).map_err(|_| UrlencodedError::Parse)
}
}
///Synchronously receive response body as json.
pub fn sync_json<T: DeserializeOwned>(self) -> Result<T, JsonPayloadError> {
let json = match self.mime_type() {
Ok(Some(mime)) => mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON),
_ => false
};
if !json {
return Err(JsonPayloadError::ContentType);
}
let body = self.sync_body().map_err(|error| JsonPayloadError::from(error))?;
serde_json::from_slice::<T>(&body).map_err(|error| JsonPayloadError::from(error))
}
}
enum SyncJob {
Request(async::ClientRequest),
CollectBody(ClientMessage, Option<Box<Pipeline>>),
}
enum SyncJobResult {
Response(ClientMessage, Option<Box<Pipeline>>),
SendError(SendRequestError),
Body(Result<Bytes, PayloadError>),
}
type ClientSender = crossbeam_channel::Sender<(SyncJob, oneshot::Sender<SyncJobResult>)>;
///Synchronous HTTP Client
pub struct Client {
sender: ClientSender,
_worker: thread::JoinHandle<()>
}
impl Client {
///Creates new instance.
pub fn new() -> io::Result<Self> {
let (sender, receiver) = crossbeam_channel::unbounded();
let worker = thread::Builder::new().name("actix-web-sync-worker".into()).spawn(move || {
while let Some((req, receiver)) = receiver.recv() {
let receiver: oneshot::Sender<SyncJobResult> = receiver;
let _ = match req {
SyncJob::Request(req) => match req.send().wait() {
Ok(rsp) => {
let (msg, pipeline) = rsp.into_parts();
//TODO: Consider Do we need Rc actually?
let msg = match Rc::try_unwrap(msg) {
Ok(msg) => msg.into_inner(),
Err(_) => panic!("Unable to unwrap")
};
receiver.send(SyncJobResult::Response(msg, pipeline))
},
Err(error) => receiver.send(SyncJobResult::SendError(error))
},
SyncJob::CollectBody(message, pipeline) => {
let body = async::ClientResponse::from_parts(message, pipeline).body();
receiver.send(SyncJobResult::Body(body.collect()))
},
};
}
})?;
Ok(Self {
sender: sender,
_worker: worker
})
}
///Sends HTTP request synchronously.
pub fn send(&self, request: async::ClientRequest) -> Result<ClientResponse, SendRequestError> {
let (sender, receiver) = oneshot::channel();
self.sender.send((SyncJob::Request(request), sender));
match receiver.wait() {
Ok(rsp) => match rsp {
SyncJobResult::Response(message, pipeline) => Ok(ClientResponse::new(message, pipeline, self.sender.clone())),
SyncJobResult::SendError(error) => Err(error),
_ => unreachable!()
},
Err(_canceled) => panic!("worker thread panicked!"),
}
}
}

View file

@ -458,3 +458,18 @@ fn test_default_headers() {
"\""
)));
}
#[test]
fn test_sync_client() {
use actix_web::client::sync;
actix::System::run(|| {
actix::Arbiter::system().do_send(actix::msgs::SystemExit(0));
let client = sync::Client::new().expect("To create sync client");
let srv = test::TestServer::new(|app| app.handler(|_| HttpResponse::Ok().body(STR)));
let request = srv.get().finish().expect("To create request");
//TODO: seems to fail to spawn job
let response = client.send(request).expect("To get successful response");
});
}