diff --git a/generic/threadshare/Cargo.toml b/generic/threadshare/Cargo.toml index 8b0a0cd5..6b21337b 100644 --- a/generic/threadshare/Cargo.toml +++ b/generic/threadshare/Cargo.toml @@ -21,8 +21,9 @@ gst-net = { package = "gstreamer-net", git = "https://gitlab.freedesktop.org/gst gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gstreamer-sys = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys" } pin-project = "0.4" +once_cell = "1" tokio = { git = "https://github.com/fengalin/tokio", branch = "fengalin/throttling", features = ["io-util", "macros", "rt-core", "sync", "stream", "time", "tcp", "udp", "rt-util"] } -futures = "0.3" +futures = { version = "0.3", features = ["thread-pool"] } lazy_static = "1.0" rand = "0.7" net2 = "0.2" diff --git a/generic/threadshare/src/runtime/task.rs b/generic/threadshare/src/runtime/task.rs index be71c6d2..636f486b 100644 --- a/generic/threadshare/src/runtime/task.rs +++ b/generic/threadshare/src/runtime/task.rs @@ -20,7 +20,7 @@ use futures::channel::mpsc as async_mpsc; use futures::channel::oneshot; -use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture}; +use futures::future::{self, abortable, AbortHandle, Aborted, BoxFuture, RemoteHandle}; use futures::prelude::*; use futures::stream::StreamExt; @@ -32,7 +32,7 @@ use std::stringify; use std::sync::{Arc, Mutex, MutexGuard}; use super::executor::{block_on_or_add_sub_task, TaskId}; -use super::{Context, JoinHandle, RUNTIME_CAT}; +use super::{Context, RUNTIME_CAT}; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy)] pub enum TaskState { @@ -277,18 +277,9 @@ impl fmt::Debug for TransitionRequest { #[derive(Debug)] struct TaskInner { - // The state machine needs an un-throttling Context because otherwise - // `transition_rx.next()` would throttle at each transition request. - // Since pipelines serialize state changes, this would lead to long starting / stopping - // when a pipeline consists in a large number of elements. - // See also the comment about transition_tx below. - state_machine_context: Context, - // The TaskImpl processings are spawned on the Task Context by the state machine. context: Option, state: TaskState, - state_machine_handle: Option>, - // The transition channel allows serializing transitions handling, - // preventing race conditions when transitions are run in //. + state_machine_handle: Option>, transition_tx: Option>, prepare_abort_handle: Option, loop_abort_handle: Option, @@ -298,7 +289,6 @@ struct TaskInner { impl Default for TaskInner { fn default() -> Self { TaskInner { - state_machine_context: Context::acquire("state_machine", 0).unwrap(), context: None, state: TaskState::Unprepared, state_machine_handle: None, @@ -458,24 +448,17 @@ impl Task { inner.state = TaskState::Preparing; - gst_log!(RUNTIME_CAT, "Starting task state machine"); + gst_log!(RUNTIME_CAT, "Spawning task state machine"); // FIXME allow configuration of the channel buffer size, // this determines the contention on the Task. let (transition_tx, transition_rx) = async_mpsc::channel(4); let state_machine = StateMachine::new(Box::new(task_impl), transition_rx); - let (transition_req, _) = TransitionRequest::new(Transition::Prepare); - inner.state_machine_handle = Some(inner.state_machine_context.spawn(state_machine.run( - Arc::clone(&self.0), - context.clone(), - transition_req, - ))); + inner.state_machine_handle = Some(self.spawn_state_machine(state_machine, &context)); inner.transition_tx = Some(transition_tx); inner.context = Some(context); - gst_log!(RUNTIME_CAT, "Task state machine started"); - Ok(TransitionStatus::Async { transition: Transition::Prepare, origin, @@ -540,7 +523,7 @@ impl Task { state_machine_handle, ); let join_fut = block_on_or_add_sub_task(async { - state_machine_handle.await.unwrap(); + state_machine_handle.await; drop(transition_tx); drop(context); @@ -689,6 +672,22 @@ impl Task { Ok(TransitionStatus::Async { transition, origin }) }) } + + fn spawn_state_machine( + &self, + state_machine: StateMachine, + context: &Context, + ) -> RemoteHandle<()> { + use futures::executor::ThreadPool; + use futures::task::SpawnExt; + use once_cell::sync::OnceCell; + + static EXECUTOR: OnceCell = OnceCell::new(); + EXECUTOR + .get_or_init(|| ThreadPool::builder().pool_size(1).create().unwrap()) + .spawn_with_handle(state_machine.run(Arc::clone(&self.0), context.clone())) + .unwrap() + } } struct StateMachine { @@ -783,15 +782,11 @@ impl StateMachine { } } - async fn run( - mut self, - task_inner: Arc>, - context: Context, - mut transition_req: TransitionRequest, - ) { + async fn run(mut self, task_inner: Arc>, context: Context) { gst_trace!(RUNTIME_CAT, "Preparing task"); { + let (mut transition_req, _) = TransitionRequest::new(Transition::Prepare); let res = exec_hook!( self, prepare,