diff --git a/gst-plugin-threadshare/src/udpsrc.rs b/gst-plugin-threadshare/src/udpsrc.rs index ae5c09f2..cccfd0c7 100644 --- a/gst-plugin-threadshare/src/udpsrc.rs +++ b/gst-plugin-threadshare/src/udpsrc.rs @@ -67,6 +67,7 @@ struct IOContextRunner { impl IOContextRunner { fn start_single_threaded( name: &str, + wait: u32, reactor: reactor::Reactor, ) -> (IOContextExecutor, IOContextShutdown) { let handle = reactor.handle().clone(); @@ -84,7 +85,7 @@ impl IOContextRunner { pending_futures: Some(pending_futures), }; let join = thread::spawn(move || { - runner.run(reactor); + runner.run(wait, reactor); }); let executor = IOContextExecutor { @@ -102,7 +103,7 @@ impl IOContextRunner { (executor, shutdown) } - fn start(name: &str, reactor: reactor::Reactor) -> IOContextShutdown { + fn start(name: &str, wait: u32, reactor: reactor::Reactor) -> IOContextShutdown { let handle = reactor.handle().clone(); let shutdown = Arc::new(atomic::AtomicUsize::new(RUNNING)); let shutdown_clone = shutdown.clone(); @@ -114,7 +115,7 @@ impl IOContextRunner { pending_futures: None, }; let join = thread::spawn(move || { - runner.run(reactor); + runner.run(wait, reactor); }); let shutdown = IOContextShutdown { @@ -127,7 +128,10 @@ impl IOContextRunner { shutdown } - fn run(&mut self, reactor: reactor::Reactor) { + fn run(&mut self, wait: u32, reactor: reactor::Reactor) { + use std::time; + let wait = time::Duration::from_millis(wait as u64); + gst_debug!(CONTEXT_CAT, "Started reactor thread '{}'", self.name); if let Some(ref pending_futures) = self.pending_futures { @@ -139,6 +143,8 @@ impl IOContextRunner { let mut current_thread = current_thread::CurrentThread::new_with_park(reactor); ::tokio_reactor::with_default(&handle, &mut enter, |enter| loop { + let now = time::Instant::now(); + if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING { break; } @@ -154,11 +160,19 @@ impl IOContextRunner { gst_trace!(CONTEXT_CAT, "Turning current thread '{}'", self.name); current_thread.enter(enter).turn(None).unwrap(); gst_trace!(CONTEXT_CAT, "Turned current thread '{}'", self.name); + + let elapsed = now.elapsed(); + if elapsed < wait { + gst_trace!(CONTEXT_CAT, "Waiting for {:?} before polling again", wait - elapsed); + thread::sleep(wait - elapsed); + } }); } else { let mut reactor = reactor; loop { + let now = time::Instant::now(); + if self.shutdown.load(atomic::Ordering::SeqCst) > RUNNING { break; } @@ -166,6 +180,12 @@ impl IOContextRunner { gst_trace!(CONTEXT_CAT, "Turning reactor '{}'", self.name); reactor.turn(None).unwrap(); gst_trace!(CONTEXT_CAT, "Turned reactor '{}'", self.name); + + let elapsed = now.elapsed(); + if elapsed < wait { + gst_trace!(CONTEXT_CAT, "Waiting for {:?} before polling again", wait - elapsed); + thread::sleep(wait - elapsed); + } } } } @@ -235,7 +255,7 @@ impl Drop for IOContextInner { } impl IOContext { - fn new(name: &str, n_threads: isize) -> Self { + fn new(name: &str, n_threads: isize, wait: u32) -> Self { let mut contexts = CONTEXTS.lock().unwrap(); if let Some(context) = contexts.get(name) { if let Some(context) = context.upgrade() { @@ -249,7 +269,7 @@ impl IOContext { let (pool, shutdown) = if n_threads >= 0 { let handle = reactor.handle().clone(); - let shutdown = IOContextRunner::start(name, reactor); + let shutdown = IOContextRunner::start(name, wait, reactor); let mut pool_builder = thread_pool::Builder::new(); pool_builder.around_worker(move |w, enter| { @@ -263,7 +283,7 @@ impl IOContext { } (Either::Left(pool_builder.build()), shutdown) } else { - let (executor, shutdown) = IOContextRunner::start_single_threaded(name, reactor); + let (executor, shutdown) = IOContextRunner::start_single_threaded(name, wait, reactor); (Either::Right(executor), shutdown) }; @@ -381,7 +401,8 @@ impl Socket { .for_each(move |buffer| { let res = func(buffer); match res { - Ok(()) => future::Either::A(YieldOnce::new()), + Ok(()) => future::Either::A(Ok(()).into_future()), + //Ok(()) => future::Either::A(YieldOnce::new()), Err(err) => future::Either::B(Err(err).into_future()), } }) @@ -552,6 +573,7 @@ const DEFAULT_CAPS: Option = None; const DEFAULT_MTU: u32 = 1500; const DEFAULT_CONTEXT: &'static str = ""; const DEFAULT_CONTEXT_THREADS: i32 = 0; +const DEFAULT_CONTEXT_WAIT: u32 = 0; #[derive(Debug, Clone)] struct Settings { @@ -561,6 +583,7 @@ struct Settings { mtu: u32, context: String, context_threads: i32, + context_wait: u32, } impl Default for Settings { @@ -572,11 +595,12 @@ impl Default for Settings { mtu: DEFAULT_MTU, context: DEFAULT_CONTEXT.into(), context_threads: DEFAULT_CONTEXT_THREADS, + context_wait: DEFAULT_CONTEXT_WAIT, } } } -static PROPERTIES: [Property; 6] = [ +static PROPERTIES: [Property; 7] = [ Property::String( "address", "Address", @@ -622,6 +646,14 @@ static PROPERTIES: [Property; 6] = [ DEFAULT_CONTEXT_THREADS, PropertyMutability::ReadWrite, ), + Property::UInt( + "context-wait", + "Context Wait", + "Throttle poll loop to run at most once every this many ms", + (0, 1000), + DEFAULT_CONTEXT_WAIT, + PropertyMutability::ReadWrite, + ), ]; struct State { @@ -758,7 +790,7 @@ impl UdpSrc { // TODO: Error handling let mut state = self.state.lock().unwrap(); - let io_context = IOContext::new(&settings.context, settings.context_threads as isize); + let io_context = IOContext::new(&settings.context, settings.context_threads as isize, settings.context_wait); let addr: IpAddr = match settings.address { None => return Err(()), @@ -926,6 +958,10 @@ impl ObjectImpl for UdpSrc { let mut settings = self.settings.lock().unwrap(); settings.context_threads = value.get().unwrap(); } + Property::UInt("context-wait", ..) => { + let mut settings = self.settings.lock().unwrap(); + settings.context_wait = value.get().unwrap(); + } _ => unimplemented!(), } } @@ -958,6 +994,10 @@ impl ObjectImpl for UdpSrc { let mut settings = self.settings.lock().unwrap(); Ok(settings.context_threads.to_value()) } + Property::UInt("context-wait", ..) => { + let mut settings = self.settings.lock().unwrap(); + Ok(settings.context_wait.to_value()) + } _ => unimplemented!(), } }