Apply backpressure in pushers, portmap

This commit is contained in:
asonix 2018-11-11 12:42:10 -06:00
parent c0fcedf8a1
commit a877b5c9e8
5 changed files with 6 additions and 5 deletions

View file

@ -1 +1 @@
RUST_LOG=server_jobs_example,jobs_server_tokio=info RUST_LOG=info

View file

@ -12,7 +12,7 @@ serde = "1.0"
serde_json = "1.0" serde_json = "1.0"
tokio = "0.1" tokio = "0.1"
tokio-threadpool = "0.1" tokio-threadpool = "0.1"
tokio-zmq = "0.5" tokio-zmq = "0.6.1"
zmq = "0.8" zmq = "0.8"
[dependencies.jobs-core] [dependencies.jobs-core]

View file

@ -40,7 +40,7 @@ impl PortMapConfig {
config: _, config: _,
} = self; } = self;
let (sink, stream) = rep.sink_stream().split(); let (sink, stream) = rep.sink_stream(1).split();
let fut = stream let fut = stream
.from_err::<Error>() .from_err::<Error>()

View file

@ -57,7 +57,7 @@ impl PushConfig {
.from_err() .from_err()
.and_then(move |_| dequeue_jobs(storage.clone(), queue.clone())) .and_then(move |_| dequeue_jobs(storage.clone(), queue.clone()))
.flatten() .flatten()
.forward(pusher.sink()) .forward(pusher.sink(25))
.map(move |_| { .map(move |_| {
info!( info!(
"Pusher for queue {} is shutting down", "Pusher for queue {} is shutting down",
@ -137,6 +137,7 @@ impl ResetPushConfig {
fn build(self) -> impl Future<Item = (), Error = Error> { fn build(self) -> impl Future<Item = (), Error = Error> {
lazy(|| { lazy(|| {
info!("Building and spawning new server");
let pusher = Push::builder(self.config.context.clone()) let pusher = Push::builder(self.config.context.clone())
.bind(&self.address) .bind(&self.address)
.build()?; .build()?;

View file

@ -57,7 +57,7 @@ impl Worker {
.stream() .stream()
.from_err::<Error>() .from_err::<Error>()
.and_then(move |multipart| wrap_processing(multipart, &processors)) .and_then(move |multipart| wrap_processing(multipart, &processors))
.forward(push.sink()) .forward(push.sink(2))
.map(move |_| info!("worker for queue {} is shutting down", queue)) .map(move |_| info!("worker for queue {} is shutting down", queue))
.map_err(|e| { .map_err(|e| {
error!("Error processing job, {}", e); error!("Error processing job, {}", e);