From a877b5c9e86327379fa3d6d55bcdc1312632fcc9 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 11 Nov 2018 12:42:10 -0600 Subject: [PATCH] Apply backpressure in pushers, portmap --- examples/server-jobs-example/.env | 2 +- jobs-server-tokio/Cargo.toml | 2 +- jobs-server-tokio/src/server/portmap.rs | 2 +- jobs-server-tokio/src/server/push.rs | 3 ++- jobs-server-tokio/src/worker/config.rs | 2 +- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/server-jobs-example/.env b/examples/server-jobs-example/.env index 711bfdf..a6ece83 100644 --- a/examples/server-jobs-example/.env +++ b/examples/server-jobs-example/.env @@ -1 +1 @@ -RUST_LOG=server_jobs_example,jobs_server_tokio=info +RUST_LOG=info diff --git a/jobs-server-tokio/Cargo.toml b/jobs-server-tokio/Cargo.toml index 9c6f40f..1b25ad9 100644 --- a/jobs-server-tokio/Cargo.toml +++ b/jobs-server-tokio/Cargo.toml @@ -12,7 +12,7 @@ serde = "1.0" serde_json = "1.0" tokio = "0.1" tokio-threadpool = "0.1" -tokio-zmq = "0.5" +tokio-zmq = "0.6.1" zmq = "0.8" [dependencies.jobs-core] diff --git a/jobs-server-tokio/src/server/portmap.rs b/jobs-server-tokio/src/server/portmap.rs index b1267c2..0bde1f6 100644 --- a/jobs-server-tokio/src/server/portmap.rs +++ b/jobs-server-tokio/src/server/portmap.rs @@ -40,7 +40,7 @@ impl PortMapConfig { config: _, } = self; - let (sink, stream) = rep.sink_stream().split(); + let (sink, stream) = rep.sink_stream(1).split(); let fut = stream .from_err::() diff --git a/jobs-server-tokio/src/server/push.rs b/jobs-server-tokio/src/server/push.rs index be4cf1a..79f8d37 100644 --- a/jobs-server-tokio/src/server/push.rs +++ b/jobs-server-tokio/src/server/push.rs @@ -57,7 +57,7 @@ impl PushConfig { .from_err() .and_then(move |_| dequeue_jobs(storage.clone(), queue.clone())) .flatten() - .forward(pusher.sink()) + .forward(pusher.sink(25)) .map(move |_| { info!( "Pusher for queue {} is shutting down", @@ -137,6 +137,7 @@ impl ResetPushConfig { fn build(self) -> impl Future { lazy(|| { + info!("Building and spawning new server"); let pusher = Push::builder(self.config.context.clone()) .bind(&self.address) .build()?; diff --git a/jobs-server-tokio/src/worker/config.rs b/jobs-server-tokio/src/worker/config.rs index 00130e1..601c46a 100644 --- a/jobs-server-tokio/src/worker/config.rs +++ b/jobs-server-tokio/src/worker/config.rs @@ -57,7 +57,7 @@ impl Worker { .stream() .from_err::() .and_then(move |multipart| wrap_processing(multipart, &processors)) - .forward(push.sink()) + .forward(push.sink(2)) .map(move |_| info!("worker for queue {} is shutting down", queue)) .map_err(|e| { error!("Error processing job, {}", e);