threadshare: Turn the single-threaded executor until no futures are left to be run before waiting

Otherwise in e.g. a pipeline like
  ts-udpsrc ! ts-queue ! fakesink
the first turn would only get a packet and queue it up, then we would
wait due to throttling and only then we would forward the packet from
the queue (but not poll the socket again), wait again due to throttling
and only then poll and get the next packet.

See https://github.com/tokio-rs/tokio/issues/310
This commit is contained in:
Sebastian Dröge 2018-04-12 08:59:39 +02:00
parent b56e1a9873
commit 2dfca38977
2 changed files with 13 additions and 6 deletions

View file

@ -10,11 +10,11 @@ gstreamer-sys = { git = "https://github.com/sdroege/gstreamer-sys" }
glib = { git = "https://github.com/gtk-rs/glib" }
gstreamer = { git = "https://github.com/sdroege/gstreamer-rs" }
gst-plugin = { git = "https://github.com/sdroege/gst-plugin-rs" }
tokio = { git = "https://github.com/tokio-rs/tokio" }
tokio-reactor = { git = "https://github.com/tokio-rs/tokio" }
tokio-executor = { git = "https://github.com/tokio-rs/tokio" }
tokio-timer = { git = "https://github.com/tokio-rs/tokio" }
tokio-threadpool = { git = "https://github.com/tokio-rs/tokio" }
tokio = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
tokio-reactor = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
tokio-executor = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
tokio-timer = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
tokio-threadpool = { git = "https://github.com/sdroege/tokio", branch = "fair-turn" }
futures = "0.1"
lazy_static = "1.0"
either = "1.0"

View file

@ -134,6 +134,8 @@ impl IOContextRunner {
::tokio_reactor::with_default(&handle, &mut enter, |mut enter| {
::tokio_timer::with_default(&timer_handle, &mut enter, |enter| loop {
use tokio::executor::current_thread::Turn;
let now = time::Instant::now();
if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING {
@ -148,7 +150,11 @@ impl IOContextRunner {
}
gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name);
current_thread.enter(enter).turn(None).unwrap();
while let Turn(true) = current_thread
.enter(enter)
.turn(Some(time::Duration::from_millis(0)))
.unwrap()
{}
gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name);
let elapsed = now.elapsed();
@ -277,6 +283,7 @@ impl IOContext {
let shutdown = IOContextRunner::start(name, wait, reactor);
// FIXME: The executor threads are not throttled at all, only the reactor
let mut pool_builder = thread_pool::Builder::new();
pool_builder
.around_worker(move |w, enter| {